1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.operation.iterator;
19
20 import static java.util.Objects.requireNonNull;
21
22 import com.google.protobuf.ByteString;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import org.tikv.common.TiConfiguration;
26 import org.tikv.common.exception.GrpcException;
27 import org.tikv.common.exception.KeyException;
28 import org.tikv.common.key.Key;
29 import org.tikv.common.region.RegionStoreClient;
30 import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
31 import org.tikv.common.region.TiRegion;
32 import org.tikv.common.region.TiStore;
33 import org.tikv.common.util.BackOffer;
34 import org.tikv.common.util.ConcreteBackOffer;
35 import org.tikv.common.util.Pair;
36 import org.tikv.kvproto.Kvrpcpb;
37
38 public class ConcreteScanIterator extends ScanIterator {
39 private final long version;
40 private final Logger logger = LoggerFactory.getLogger(ConcreteScanIterator.class);
41
42 public ConcreteScanIterator(
43 TiConfiguration conf,
44 RegionStoreClientBuilder builder,
45 ByteString startKey,
46 long version,
47 int limit) {
48
49 this(conf, builder, startKey, ByteString.EMPTY, version, limit);
50 }
51
52 public ConcreteScanIterator(
53 TiConfiguration conf,
54 RegionStoreClientBuilder builder,
55 ByteString startKey,
56 ByteString endKey,
57 long version) {
58
59 this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE);
60 }
61
62 private ConcreteScanIterator(
63 TiConfiguration conf,
64 RegionStoreClientBuilder builder,
65 ByteString startKey,
66 ByteString endKey,
67 long version,
68 int limit) {
69 super(conf, builder, startKey, endKey, limit, false);
70 this.version = version;
71 }
72
73 @Override
74 TiRegion loadCurrentRegionToCache() throws GrpcException {
75 TiRegion region;
76 try (RegionStoreClient client = builder.build(startKey)) {
77 client.setTimeout(conf.getScanTimeout());
78 BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
79 currentCache = client.scan(backOffer, startKey, version);
80
81
82
83
84 region = client.getRegion();
85 return region;
86 }
87 }
88
89 private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
90 logger.warn(String.format("resolve current key error %s", current.getError().toString()));
91 Pair<TiRegion, TiStore> pair =
92 builder.getRegionManager().getRegionStorePairByKey(current.getKey());
93 TiRegion region = pair.first;
94 TiStore store = pair.second;
95 BackOffer backOffer =
96 ConcreteBackOffer.newGetBackOff(builder.getRegionManager().getPDClient().getClusterId());
97 try (RegionStoreClient client = builder.build(region, store)) {
98 return client.get(backOffer, current.getKey(), version);
99 } catch (Exception e) {
100 throw new KeyException(current.getError());
101 }
102 }
103
104 @Override
105 public boolean hasNext() {
106 Kvrpcpb.KvPair current;
107
108 do {
109 current = getCurrent();
110 if (isCacheDrained() && cacheLoadFails()) {
111 endOfScan = true;
112 return false;
113 }
114 } while (currentCache != null && current == null);
115
116 return !processingLastBatch
117 || current == null
118 || (hasEndKey && Key.toRawKey(current.getKey()).compareTo(endKey) < 0);
119 }
120
121 @Override
122 public Kvrpcpb.KvPair next() {
123 --limit;
124 Kvrpcpb.KvPair current = currentCache.get(index++);
125
126 requireNonNull(current, "current kv pair cannot be null");
127 if (current.hasError()) {
128 ByteString val = resolveCurrentLock(current);
129 current = Kvrpcpb.KvPair.newBuilder().setKey(current.getKey()).setValue(val).build();
130 }
131
132 return current;
133 }
134
135
136
137
138
139
140
141 private boolean isCacheDrained() {
142 return currentCache == null || limit <= 0 || index >= currentCache.size() || index == -1;
143 }
144
145 private Kvrpcpb.KvPair getCurrent() {
146 if (isCacheDrained()) {
147 return null;
148 }
149 return currentCache.get(index);
150 }
151 }