View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.operation.iterator;
19  
20  import static java.util.Objects.requireNonNull;
21  
22  import com.google.protobuf.ByteString;
23  import java.util.Iterator;
24  import java.util.List;
25  import org.tikv.common.TiConfiguration;
26  import org.tikv.common.exception.GrpcException;
27  import org.tikv.common.exception.TiClientInternalException;
28  import org.tikv.common.key.Key;
29  import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
30  import org.tikv.common.region.TiRegion;
31  import org.tikv.kvproto.Kvrpcpb;
32  
33  public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
34    protected final TiConfiguration conf;
35    protected final RegionStoreClientBuilder builder;
36    protected List<Kvrpcpb.KvPair> currentCache;
37    protected ByteString startKey;
38    protected int index = -1;
39    protected int limit;
40    protected boolean keyOnly;
41    protected boolean endOfScan = false;
42  
43    protected Key endKey;
44    protected boolean hasEndKey;
45    protected boolean processingLastBatch = false;
46  
47    ScanIterator(
48        TiConfiguration conf,
49        RegionStoreClientBuilder builder,
50        ByteString startKey,
51        ByteString endKey,
52        int limit,
53        boolean keyOnly) {
54      this.startKey = requireNonNull(startKey, "start key is null");
55      this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
56      this.hasEndKey = !endKey.isEmpty();
57      this.limit = limit;
58      this.keyOnly = keyOnly;
59      this.conf = conf;
60      this.builder = builder;
61    }
62  
63    /**
64     * Load current region to cache, returns the region if loaded.
65     *
66     * @return TiRegion of current data loaded to cache
67     * @throws GrpcException if scan still fails after backoff
68     *     <p>TODO : Add test to check it correctness
69     */
70    abstract TiRegion loadCurrentRegionToCache() throws GrpcException;
71  
72    // return true if current cache is not loaded or empty
73    boolean cacheLoadFails() {
74      if (endOfScan || processingLastBatch) {
75        return true;
76      }
77      if (startKey == null) {
78        return true;
79      }
80      try {
81        TiRegion region = loadCurrentRegionToCache();
82        ByteString curRegionEndKey = region.getEndKey();
83        // currentCache is null means no keys found, whereas currentCache is empty means no values
84        // found. The difference lies in whether to continue scanning, because chances are that
85        // an empty region exists due to deletion, region split, e.t.c.
86        // See https://github.com/pingcap/tispark/issues/393 for details
87        if (currentCache == null) {
88          return true;
89        }
90        index = 0;
91        Key lastKey = Key.EMPTY;
92        // Session should be single-threaded itself
93        // so that we don't worry about conf change in the middle
94        // of a transaction. Otherwise, below code might lose data
95        int scanLimit = Math.min(limit, conf.getScanBatchSize());
96        if (currentCache.size() < scanLimit) {
97          startKey = curRegionEndKey;
98          lastKey = Key.toRawKey(curRegionEndKey);
99        } else if (currentCache.size() > scanLimit) {
100         throw new IndexOutOfBoundsException(
101             "current cache size = " + currentCache.size() + ", larger than " + scanLimit);
102       } else {
103         // Start new scan from exact next key in current region
104         lastKey = Key.toRawKey(currentCache.get(currentCache.size() - 1).getKey());
105         startKey = lastKey.next().toByteString();
106       }
107       // notify last batch if lastKey is greater than or equal to endKey
108       // if startKey is empty, it indicates +∞
109       if (hasEndKey && lastKey.compareTo(endKey) >= 0 || startKey.isEmpty()) {
110         processingLastBatch = true;
111         startKey = null;
112       }
113     } catch (Exception e) {
114       throw new TiClientInternalException("Error scanning data from region.", e);
115     }
116     return false;
117   }
118 }