1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common;
19
20 import static org.tikv.common.util.ClientUtils.*;
21
22 import com.google.protobuf.ByteString;
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import java.util.concurrent.ExecutorCompletionService;
28 import java.util.concurrent.ExecutorService;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.tikv.common.exception.GrpcException;
32 import org.tikv.common.exception.TiKVException;
33 import org.tikv.common.operation.iterator.ConcreteScanIterator;
34 import org.tikv.common.region.RegionStoreClient;
35 import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
36 import org.tikv.common.region.TiRegion;
37 import org.tikv.common.util.BackOffFunction;
38 import org.tikv.common.util.BackOffer;
39 import org.tikv.common.util.Batch;
40 import org.tikv.common.util.ConcreteBackOffer;
41 import org.tikv.kvproto.Kvrpcpb.KvPair;
42
43 public class KVClient implements AutoCloseable {
44 private static final Logger logger = LoggerFactory.getLogger(KVClient.class);
45 private static final int MAX_BATCH_LIMIT = 1024;
46 private static final int BATCH_GET_SIZE = 16 * 1024;
47 private final RegionStoreClientBuilder clientBuilder;
48 private final TiConfiguration conf;
49 private final ExecutorService batchGetThreadPool;
50
51 public KVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
52 Objects.requireNonNull(clientBuilder, "clientBuilder is null");
53 this.conf = session.getConf();
54 this.clientBuilder = clientBuilder;
55 this.batchGetThreadPool = session.getThreadPoolForBatchGet();
56 }
57
58 @Override
59 public void close() {}
60
61
62
63
64
65
66
67 public ByteString get(ByteString key, long version) throws GrpcException {
68 BackOffer backOffer =
69 ConcreteBackOffer.newGetBackOff(
70 clientBuilder.getRegionManager().getPDClient().getClusterId());
71 while (true) {
72 RegionStoreClient client = clientBuilder.build(key);
73 try {
74 return client.get(backOffer, key, version);
75 } catch (final TiKVException e) {
76 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
77 }
78 }
79 }
80
81
82
83
84
85
86
87
88
89
90 public List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long version)
91 throws GrpcException {
92 return doSendBatchGet(backOffer, keys, version);
93 }
94
95
96
97
98
99
100
101
102 public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
103 throws GrpcException {
104 Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, endKey, version);
105 List<KvPair> result = new ArrayList<>();
106 iterator.forEachRemaining(result::add);
107 return result;
108 }
109
110
111
112
113
114
115
116
117 public List<KvPair> scan(ByteString startKey, long version, int limit) throws GrpcException {
118 Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
119 List<KvPair> result = new ArrayList<>();
120 iterator.forEachRemaining(result::add);
121 return result;
122 }
123
124 public List<KvPair> scan(ByteString startKey, long version) throws GrpcException {
125 return scan(startKey, version, Integer.MAX_VALUE);
126 }
127
128 private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys, long version) {
129 ExecutorCompletionService<List<KvPair>> completionService =
130 new ExecutorCompletionService<>(batchGetThreadPool);
131
132 List<Batch> batches =
133 getBatches(backOffer, keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
134
135 for (Batch batch : batches) {
136 completionService.submit(
137 () -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch, version));
138 }
139
140 return getKvPairs(completionService, batches, BackOffer.BATCH_GET_MAX_BACKOFF);
141 }
142
143 private List<KvPair> doSendBatchGetInBatchesWithRetry(
144 BackOffer backOffer, Batch batch, long version) {
145 TiRegion oldRegion = batch.getRegion();
146 TiRegion currentRegion =
147 clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());
148
149 if (oldRegion.equals(currentRegion)) {
150 RegionStoreClient client = clientBuilder.build(batch.getRegion());
151 try {
152 return client.batchGet(backOffer, batch.getKeys(), version);
153 } catch (final TiKVException e) {
154 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
155 clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
156 logger.warn("ReSplitting ranges for BatchGetRequest", e);
157
158
159 return doSendBatchGetWithRefetchRegion(backOffer, batch, version);
160 }
161 } else {
162 return doSendBatchGetWithRefetchRegion(backOffer, batch, version);
163 }
164 }
165
166 private List<KvPair> doSendBatchGetWithRefetchRegion(
167 BackOffer backOffer, Batch batch, long version) {
168 List<Batch> retryBatches =
169 getBatches(backOffer, batch.getKeys(), BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
170
171 ArrayList<KvPair> results = new ArrayList<>();
172 for (Batch retryBatch : retryBatches) {
173
174 List<KvPair> batchResult =
175 doSendBatchGetInBatchesWithRetry(retryBatch.getBackOffer(), retryBatch, version);
176 results.addAll(batchResult);
177 }
178 return results;
179 }
180
181 private Iterator<KvPair> scanIterator(
182 TiConfiguration conf,
183 RegionStoreClientBuilder builder,
184 ByteString startKey,
185 ByteString endKey,
186 long version) {
187 return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
188 }
189
190 private Iterator<KvPair> scanIterator(
191 TiConfiguration conf,
192 RegionStoreClientBuilder builder,
193 ByteString startKey,
194 long version,
195 int limit) {
196 return new ConcreteScanIterator(conf, builder, startKey, version, limit);
197 }
198 }