1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.operation.iterator;
19
20 import static java.util.Objects.requireNonNull;
21
22 import com.google.protobuf.ByteString;
23 import java.util.Iterator;
24 import java.util.List;
25 import org.tikv.common.TiConfiguration;
26 import org.tikv.common.exception.GrpcException;
27 import org.tikv.common.exception.TiClientInternalException;
28 import org.tikv.common.key.Key;
29 import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
30 import org.tikv.common.region.TiRegion;
31 import org.tikv.kvproto.Kvrpcpb;
32
33 public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
34 protected final TiConfiguration conf;
35 protected final RegionStoreClientBuilder builder;
36 protected List<Kvrpcpb.KvPair> currentCache;
37 protected ByteString startKey;
38 protected int index = -1;
39 protected int limit;
40 protected boolean keyOnly;
41 protected boolean endOfScan = false;
42
43 protected Key endKey;
44 protected boolean hasEndKey;
45 protected boolean processingLastBatch = false;
46
47 ScanIterator(
48 TiConfiguration conf,
49 RegionStoreClientBuilder builder,
50 ByteString startKey,
51 ByteString endKey,
52 int limit,
53 boolean keyOnly) {
54 this.startKey = requireNonNull(startKey, "start key is null");
55 this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
56 this.hasEndKey = !endKey.isEmpty();
57 this.limit = limit;
58 this.keyOnly = keyOnly;
59 this.conf = conf;
60 this.builder = builder;
61 }
62
63
64
65
66
67
68
69
70 abstract TiRegion loadCurrentRegionToCache() throws GrpcException;
71
72
73 boolean cacheLoadFails() {
74 if (endOfScan || processingLastBatch) {
75 return true;
76 }
77 if (startKey == null) {
78 return true;
79 }
80 try {
81 TiRegion region = loadCurrentRegionToCache();
82 ByteString curRegionEndKey = region.getEndKey();
83
84
85
86
87 if (currentCache == null) {
88 return true;
89 }
90 index = 0;
91 Key lastKey = Key.EMPTY;
92
93
94
95 int scanLimit = Math.min(limit, conf.getScanBatchSize());
96 if (currentCache.size() < scanLimit) {
97 startKey = curRegionEndKey;
98 lastKey = Key.toRawKey(curRegionEndKey);
99 } else if (currentCache.size() > scanLimit) {
100 throw new IndexOutOfBoundsException(
101 "current cache size = " + currentCache.size() + ", larger than " + scanLimit);
102 } else {
103
104 lastKey = Key.toRawKey(currentCache.get(currentCache.size() - 1).getKey());
105 startKey = lastKey.next().toByteString();
106 }
107
108
109 if (hasEndKey && lastKey.compareTo(endKey) >= 0 || startKey.isEmpty()) {
110 processingLastBatch = true;
111 startKey = null;
112 }
113 } catch (Exception e) {
114 throw new TiClientInternalException("Error scanning data from region.", e);
115 }
116 return false;
117 }
118 }