1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.operation.iterator;
19
20 import com.google.protobuf.ByteString;
21 import org.tikv.common.TiConfiguration;
22 import org.tikv.common.exception.GrpcException;
23 import org.tikv.common.exception.TiKVException;
24 import org.tikv.common.key.Key;
25 import org.tikv.common.region.RegionStoreClient;
26 import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
27 import org.tikv.common.region.TiRegion;
28 import org.tikv.common.util.BackOffFunction;
29 import org.tikv.common.util.BackOffer;
30 import org.tikv.kvproto.Kvrpcpb;
31
32 public class RawScanIterator extends ScanIterator {
33 private final BackOffer scanBackOffer;
34
35 public RawScanIterator(
36 TiConfiguration conf,
37 RegionStoreClientBuilder builder,
38 ByteString startKey,
39 ByteString endKey,
40 int limit,
41 boolean keyOnly,
42 BackOffer scanBackOffer) {
43 super(conf, builder, startKey, endKey, limit, keyOnly);
44
45 this.scanBackOffer = scanBackOffer;
46 }
47
48 @Override
49 TiRegion loadCurrentRegionToCache() throws GrpcException {
50 BackOffer backOffer = scanBackOffer;
51 while (true) {
52 try (RegionStoreClient client = builder.build(startKey, backOffer)) {
53 client.setTimeout(conf.getRawKVScanTimeoutInMS());
54 TiRegion region = client.getRegion();
55 if (limit <= 0) {
56 currentCache = null;
57 } else {
58 try {
59 currentCache = client.rawScan(backOffer, startKey, limit, keyOnly);
60
61
62 region = client.getRegion();
63 } catch (final TiKVException e) {
64 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
65 continue;
66 }
67 }
68 return region;
69 }
70 }
71 }
72
73 private boolean endOfScan() {
74 if (!processingLastBatch) {
75 return false;
76 }
77 ByteString lastKey = currentCache.get(index).getKey();
78 return !lastKey.isEmpty() && Key.toRawKey(lastKey).compareTo(endKey) >= 0;
79 }
80
81 boolean isCacheDrained() {
82 return currentCache == null || limit <= 0 || index >= currentCache.size() || index == -1;
83 }
84
85 @Override
86 public boolean hasNext() {
87 if (isCacheDrained() && cacheLoadFails()) {
88 endOfScan = true;
89 return false;
90 }
91
92 while (currentCache != null && currentCache.isEmpty()) {
93 if (cacheLoadFails()) {
94 return false;
95 }
96 }
97 return !endOfScan();
98 }
99
100 private Kvrpcpb.KvPair getCurrent() {
101 --limit;
102 return currentCache.get(index++);
103 }
104
105 @Override
106 public Kvrpcpb.KvPair next() {
107 return getCurrent();
108 }
109 }