1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.raw;
19
20 import static org.tikv.common.util.ClientUtils.appendBatches;
21 import static org.tikv.common.util.ClientUtils.genUUID;
22 import static org.tikv.common.util.ClientUtils.getBatches;
23 import static org.tikv.common.util.ClientUtils.getTasks;
24 import static org.tikv.common.util.ClientUtils.getTasksWithOutput;
25 import static org.tikv.common.util.ClientUtils.groupKeysByRegion;
26
27 import com.google.protobuf.ByteString;
28 import io.prometheus.client.Counter;
29 import io.prometheus.client.Histogram;
30 import java.net.URI;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Objects;
38 import java.util.Optional;
39 import java.util.Queue;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.ExecutorCompletionService;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.TimeUnit;
45 import java.util.stream.Collectors;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.tikv.common.TiConfiguration;
49 import org.tikv.common.TiSession;
50 import org.tikv.common.codec.KeyUtils;
51 import org.tikv.common.exception.GrpcException;
52 import org.tikv.common.exception.RawCASConflictException;
53 import org.tikv.common.exception.TiKVException;
54 import org.tikv.common.importer.ImporterClient;
55 import org.tikv.common.importer.SwitchTiKVModeClient;
56 import org.tikv.common.key.Key;
57 import org.tikv.common.log.SlowLog;
58 import org.tikv.common.log.SlowLogEmptyImpl;
59 import org.tikv.common.log.SlowLogImpl;
60 import org.tikv.common.log.SlowLogSpan;
61 import org.tikv.common.operation.iterator.RawScanIterator;
62 import org.tikv.common.region.RegionStoreClient;
63 import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
64 import org.tikv.common.region.TiRegion;
65 import org.tikv.common.util.BackOffFunction;
66 import org.tikv.common.util.BackOffer;
67 import org.tikv.common.util.Batch;
68 import org.tikv.common.util.ConcreteBackOffer;
69 import org.tikv.common.util.DeleteRange;
70 import org.tikv.common.util.HistogramUtils;
71 import org.tikv.common.util.Pair;
72 import org.tikv.common.util.ScanOption;
73 import org.tikv.kvproto.Kvrpcpb.KvPair;
74
75 public class RawKVClient implements RawKVClientBase {
76 private final Long clusterId;
77 private final List<URI> pdAddresses;
78 private final TiSession tiSession;
79 private final RegionStoreClientBuilder clientBuilder;
80 private final TiConfiguration conf;
81 private final boolean atomicForCAS;
82 private final ExecutorService batchGetThreadPool;
83 private final ExecutorService batchPutThreadPool;
84 private final ExecutorService batchDeleteThreadPool;
85 private final ExecutorService batchScanThreadPool;
86 private final ExecutorService deleteRangeThreadPool;
87 private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class);
88
89 public static final Histogram RAW_REQUEST_LATENCY =
90 HistogramUtils.buildDuration()
91 .name("client_java_raw_requests_latency")
92 .help("client raw request latency.")
93 .labelNames("type", "cluster")
94 .register();
95
96 public static final Counter RAW_REQUEST_SUCCESS =
97 Counter.build()
98 .name("client_java_raw_requests_success")
99 .help("client raw request success.")
100 .labelNames("type", "cluster")
101 .register();
102
103 public static final Counter RAW_REQUEST_FAILURE =
104 Counter.build()
105 .name("client_java_raw_requests_failure")
106 .help("client raw request failure.")
107 .labelNames("type", "cluster")
108 .register();
109
110 private static final TiKVException ERR_MAX_SCAN_LIMIT_EXCEEDED =
111 new TiKVException("limit should be less than MAX_RAW_SCAN_LIMIT");
112
113 public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
114 Objects.requireNonNull(session, "session is null");
115 Objects.requireNonNull(clientBuilder, "clientBuilder is null");
116 this.conf = session.getConf();
117 this.tiSession = session;
118 this.clientBuilder = clientBuilder;
119 this.batchGetThreadPool = session.getThreadPoolForBatchGet();
120 this.batchPutThreadPool = session.getThreadPoolForBatchPut();
121 this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete();
122 this.batchScanThreadPool = session.getThreadPoolForBatchScan();
123 this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange();
124 this.atomicForCAS = conf.isEnableAtomicForCAS();
125 this.clusterId = session.getPDClient().getClusterId();
126 this.pdAddresses = session.getPDClient().getPdAddrs();
127 }
128
129 private SlowLog withClusterInfo(SlowLog logger) {
130 return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses);
131 }
132
133 private String[] withClusterId(String label) {
134 return new String[] {label, clusterId.toString()};
135 }
136
137 @Override
138 public void close() {}
139
140 @Override
141 public void put(ByteString key, ByteString value) {
142 put(key, value, 0);
143 }
144
145 @Override
146 public void put(ByteString key, ByteString value, long ttl) {
147 String[] labels = withClusterId("client_raw_put");
148 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
149
150 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
151 SlowLogSpan span = slowLog.start("put");
152 span.addProperty("key", KeyUtils.formatBytesUTF8(key));
153
154 ConcreteBackOffer backOffer =
155 ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId);
156 try {
157 while (true) {
158 try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
159 span.addProperty("region", client.getRegion().toString());
160 client.rawPut(backOffer, key, value, ttl, atomicForCAS);
161 RAW_REQUEST_SUCCESS.labels(labels).inc();
162 return;
163 } catch (final TiKVException e) {
164 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
165 logger.warn("Retry for put error", e);
166 }
167 }
168 } catch (Exception e) {
169 RAW_REQUEST_FAILURE.labels(labels).inc();
170 slowLog.setError(e);
171 throw e;
172 } finally {
173 requestTimer.observeDuration();
174 span.end();
175 slowLog.log();
176 }
177 }
178
179 @Override
180 public Optional<ByteString> putIfAbsent(ByteString key, ByteString value) {
181 return putIfAbsent(key, value, 0L);
182 }
183
184 @Override
185 public Optional<ByteString> putIfAbsent(ByteString key, ByteString value, long ttl) {
186 try {
187 compareAndSet(key, Optional.empty(), value, ttl);
188 return Optional.empty();
189 } catch (RawCASConflictException e) {
190 return e.getPrevValue();
191 }
192 }
193
194 @Override
195 public void compareAndSet(ByteString key, Optional<ByteString> prevValue, ByteString value)
196 throws RawCASConflictException {
197 compareAndSet(key, prevValue, value, 0L);
198 }
199
200 @Override
201 public void compareAndSet(
202 ByteString key, Optional<ByteString> prevValue, ByteString value, long ttl)
203 throws RawCASConflictException {
204 if (!atomicForCAS) {
205 throw new IllegalArgumentException(
206 "To use compareAndSet or putIfAbsent, please enable the config tikv.enable_atomic_for_cas.");
207 }
208
209 String[] labels = withClusterId("client_raw_compare_and_set");
210 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
211
212 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
213 SlowLogSpan span = slowLog.start("putIfAbsent");
214 span.addProperty("key", KeyUtils.formatBytesUTF8(key));
215
216 ConcreteBackOffer backOffer =
217 ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId);
218 try {
219 while (true) {
220 try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
221 span.addProperty("region", client.getRegion().toString());
222 client.rawCompareAndSet(backOffer, key, prevValue, value, ttl);
223 RAW_REQUEST_SUCCESS.labels(labels).inc();
224 return;
225 } catch (final TiKVException e) {
226 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
227 logger.warn("Retry for putIfAbsent error", e);
228 }
229 }
230 } catch (Exception e) {
231 RAW_REQUEST_FAILURE.labels(labels).inc();
232 slowLog.setError(e);
233 throw e;
234 } finally {
235 requestTimer.observeDuration();
236 span.end();
237 slowLog.log();
238 }
239 }
240
241 @Override
242 public void batchPut(Map<ByteString, ByteString> kvPairs) {
243 batchPut(kvPairs, 0);
244 }
245
246 @Override
247 public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
248 String[] labels = withClusterId("client_raw_batch_put");
249 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
250
251 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
252 SlowLogSpan span = slowLog.start("batchPut");
253 span.addProperty("keySize", String.valueOf(kvPairs.size()));
254
255 ConcreteBackOffer backOffer =
256 ConcreteBackOffer.newDeadlineBackOff(
257 conf.getRawKVBatchWriteTimeoutInMS(), slowLog, clusterId);
258 try {
259 long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
260 doSendBatchPut(backOffer, kvPairs, ttl, deadline);
261 RAW_REQUEST_SUCCESS.labels(labels).inc();
262 } catch (Exception e) {
263 RAW_REQUEST_FAILURE.labels(labels).inc();
264 slowLog.setError(e);
265 throw e;
266 } finally {
267 requestTimer.observeDuration();
268 span.end();
269 slowLog.log();
270 }
271 }
272
273 @Override
274 public Optional<ByteString> get(ByteString key) {
275 String[] labels = withClusterId("client_raw_get");
276 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
277
278 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
279 SlowLogSpan span = slowLog.start("get");
280 span.addProperty("key", KeyUtils.formatBytesUTF8(key));
281
282 ConcreteBackOffer backOffer =
283 ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog, clusterId);
284 try {
285 while (true) {
286 try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
287 span.addProperty("region", client.getRegion().toString());
288 Optional<ByteString> result = client.rawGet(backOffer, key);
289 RAW_REQUEST_SUCCESS.labels(labels).inc();
290 return result;
291 } catch (final TiKVException e) {
292 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
293 logger.warn("Retry for get error", e);
294 }
295 }
296 } catch (Exception e) {
297 RAW_REQUEST_FAILURE.labels(labels).inc();
298 slowLog.setError(e);
299 throw e;
300 } finally {
301 requestTimer.observeDuration();
302 span.end();
303 slowLog.log();
304 }
305 }
306
307 @Override
308 public List<KvPair> batchGet(List<ByteString> keys) {
309 String[] labels = withClusterId("client_raw_batch_get");
310 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
311 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS()));
312 SlowLogSpan span = slowLog.start("batchGet");
313 span.addProperty("keySize", String.valueOf(keys.size()));
314 ConcreteBackOffer backOffer =
315 ConcreteBackOffer.newDeadlineBackOff(
316 conf.getRawKVBatchReadTimeoutInMS(), slowLog, clusterId);
317 try {
318 long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS();
319 List<KvPair> result = doSendBatchGet(backOffer, keys, deadline);
320 RAW_REQUEST_SUCCESS.labels(labels).inc();
321 return result;
322 } catch (Exception e) {
323 RAW_REQUEST_FAILURE.labels(labels).inc();
324 slowLog.setError(e);
325 throw e;
326 } finally {
327 requestTimer.observeDuration();
328 span.end();
329 slowLog.log();
330 }
331 }
332
333 @Override
334 public void batchDelete(List<ByteString> keys) {
335 String[] labels = withClusterId("client_raw_batch_delete");
336 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
337 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
338 SlowLogSpan span = slowLog.start("batchDelete");
339 span.addProperty("keySize", String.valueOf(keys.size()));
340 ConcreteBackOffer backOffer =
341 ConcreteBackOffer.newDeadlineBackOff(
342 conf.getRawKVBatchWriteTimeoutInMS(), slowLog, clusterId);
343 try {
344 long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
345 doSendBatchDelete(backOffer, keys, deadline);
346 RAW_REQUEST_SUCCESS.labels(labels).inc();
347 } catch (Exception e) {
348 RAW_REQUEST_FAILURE.labels(labels).inc();
349 slowLog.setError(e);
350 throw e;
351 } finally {
352 requestTimer.observeDuration();
353 span.end();
354 slowLog.log();
355 }
356 }
357
358 @Override
359 public Optional<Long> getKeyTTL(ByteString key) {
360 String[] labels = withClusterId("client_raw_get_key_ttl");
361 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
362 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
363 SlowLogSpan span = slowLog.start("getKeyTTL");
364 span.addProperty("key", KeyUtils.formatBytesUTF8(key));
365 ConcreteBackOffer backOffer =
366 ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog, clusterId);
367 try {
368 while (true) {
369 try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
370 span.addProperty("region", client.getRegion().toString());
371 Optional<Long> result = client.rawGetKeyTTL(backOffer, key);
372 RAW_REQUEST_SUCCESS.labels(labels).inc();
373 return result;
374 } catch (final TiKVException e) {
375 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
376 logger.warn("Retry for getKeyTTL error", e);
377 }
378 }
379 } catch (Exception e) {
380 RAW_REQUEST_FAILURE.labels(labels).inc();
381 slowLog.setError(e);
382 throw e;
383 } finally {
384 requestTimer.observeDuration();
385 span.end();
386 slowLog.log();
387 }
388 }
389
390 @Override
391 public List<List<ByteString>> batchScanKeys(
392 List<Pair<ByteString, ByteString>> ranges, int eachLimit) {
393 return batchScan(
394 ranges
395 .stream()
396 .map(
397 range ->
398 ScanOption.newBuilder()
399 .setStartKey(range.first)
400 .setEndKey(range.second)
401 .setLimit(eachLimit)
402 .setKeyOnly(true)
403 .build())
404 .collect(Collectors.toList()))
405 .stream()
406 .map(kvs -> kvs.stream().map(kv -> kv.getKey()).collect(Collectors.toList()))
407 .collect(Collectors.toList());
408 }
409
410 @Override
411 public List<List<KvPair>> batchScan(List<ScanOption> ranges) {
412 String[] labels = withClusterId("client_raw_batch_scan");
413 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
414 long deadline = System.currentTimeMillis() + conf.getRawKVScanTimeoutInMS();
415 List<Future<Pair<Integer, List<KvPair>>>> futureList = new ArrayList<>();
416 try {
417 if (ranges.isEmpty()) {
418 return new ArrayList<>();
419 }
420 ExecutorCompletionService<Pair<Integer, List<KvPair>>> completionService =
421 new ExecutorCompletionService<>(batchScanThreadPool);
422 int num = 0;
423 for (ScanOption scanOption : ranges) {
424 int i = num;
425 futureList.add(completionService.submit(() -> Pair.create(i, scan(scanOption))));
426 ++num;
427 }
428 List<List<KvPair>> scanResults = new ArrayList<>();
429 for (int i = 0; i < num; i++) {
430 scanResults.add(new ArrayList<>());
431 }
432 for (int i = 0; i < num; i++) {
433 try {
434 Future<Pair<Integer, List<KvPair>>> future =
435 completionService.poll(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
436 if (future == null) {
437 throw new TiKVException("TimeOut Exceeded for current operation.");
438 }
439 Pair<Integer, List<KvPair>> scanResult = future.get();
440 scanResults.set(scanResult.first, scanResult.second);
441 } catch (InterruptedException e) {
442 Thread.currentThread().interrupt();
443 throw new TiKVException("Current thread interrupted.", e);
444 } catch (ExecutionException e) {
445 throw new TiKVException("Execution exception met.", e);
446 }
447 }
448 RAW_REQUEST_SUCCESS.labels(labels).inc();
449 return scanResults;
450 } catch (Exception e) {
451 RAW_REQUEST_FAILURE.labels(labels).inc();
452 for (Future<Pair<Integer, List<KvPair>>> future : futureList) {
453 future.cancel(true);
454 }
455 throw e;
456 } finally {
457 requestTimer.observeDuration();
458 }
459 }
460
461 @Override
462 public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
463 return scan(startKey, endKey, limit, false);
464 }
465
466 @Override
467 public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
468 String[] labels = withClusterId("client_raw_scan");
469 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
470 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
471 SlowLogSpan span = slowLog.start("scan");
472 span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
473 span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
474 span.addProperty("limit", String.valueOf(limit));
475 span.addProperty("keyOnly", String.valueOf(keyOnly));
476 ConcreteBackOffer backOffer =
477 ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog, clusterId);
478 try {
479 Iterator<KvPair> iterator =
480 rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer);
481 List<KvPair> result = new ArrayList<>();
482 iterator.forEachRemaining(result::add);
483 RAW_REQUEST_SUCCESS.labels(labels).inc();
484 return result;
485 } catch (Exception e) {
486 RAW_REQUEST_FAILURE.labels(labels).inc();
487 slowLog.setError(e);
488 throw e;
489 } finally {
490 requestTimer.observeDuration();
491 span.end();
492 slowLog.log();
493 }
494 }
495
496 @Override
497 public List<KvPair> scan(ByteString startKey, int limit) {
498 return scan(startKey, limit, false);
499 }
500
501 @Override
502 public List<KvPair> scan(ByteString startKey, int limit, boolean keyOnly) {
503 return scan(startKey, ByteString.EMPTY, limit, keyOnly);
504 }
505
506 @Override
507 public List<KvPair> scan(ByteString startKey, ByteString endKey) {
508 return scan(startKey, endKey, false);
509 }
510
511 @Override
512 public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
513 String[] labels = withClusterId("client_raw_scan_without_limit");
514 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
515 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
516 SlowLogSpan span = slowLog.start("scan");
517 span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
518 span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
519 span.addProperty("keyOnly", String.valueOf(keyOnly));
520 ConcreteBackOffer backOffer =
521 ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog, clusterId);
522 try {
523 ByteString newStartKey = startKey;
524 List<KvPair> result = new ArrayList<>();
525 while (true) {
526 Iterator<KvPair> iterator =
527 rawScanIterator(
528 conf,
529 clientBuilder,
530 newStartKey,
531 endKey,
532 conf.getScanBatchSize(),
533 keyOnly,
534 backOffer);
535 if (!iterator.hasNext()) {
536 break;
537 }
538 iterator.forEachRemaining(result::add);
539 newStartKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString();
540 }
541 RAW_REQUEST_SUCCESS.labels(labels).inc();
542 return result;
543 } catch (Exception e) {
544 RAW_REQUEST_FAILURE.labels(labels).inc();
545 slowLog.setError(e);
546 throw e;
547 } finally {
548 requestTimer.observeDuration();
549 span.end();
550 slowLog.log();
551 }
552 }
553
554 private List<KvPair> scan(ScanOption scanOption) {
555 ByteString startKey = scanOption.getStartKey();
556 ByteString endKey = scanOption.getEndKey();
557 int limit = scanOption.getLimit();
558 boolean keyOnly = scanOption.isKeyOnly();
559 return scan(startKey, endKey, limit, keyOnly);
560 }
561
562 @Override
563 public List<KvPair> scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) {
564 return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), limit, keyOnly);
565 }
566
567 @Override
568 public List<KvPair> scanPrefix(ByteString prefixKey) {
569 return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString());
570 }
571
572 @Override
573 public List<KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly) {
574 return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), keyOnly);
575 }
576
577 @Override
578 public void delete(ByteString key) {
579 String[] labels = withClusterId("client_raw_delete");
580 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
581 SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
582 SlowLogSpan span = slowLog.start("delete");
583 span.addProperty("key", KeyUtils.formatBytesUTF8(key));
584 ConcreteBackOffer backOffer =
585 ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId);
586 try {
587 while (true) {
588 try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
589 span.addProperty("region", client.getRegion().toString());
590 client.rawDelete(backOffer, key, atomicForCAS);
591 RAW_REQUEST_SUCCESS.labels(labels).inc();
592 return;
593 } catch (final TiKVException e) {
594 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
595 logger.warn("Retry for delete error", e);
596 }
597 }
598 } catch (Exception e) {
599 RAW_REQUEST_FAILURE.labels(labels).inc();
600 slowLog.setError(e);
601 throw e;
602 } finally {
603 requestTimer.observeDuration();
604 span.end();
605 slowLog.log();
606 }
607 }
608
609 @Override
610 public synchronized void deleteRange(ByteString startKey, ByteString endKey) {
611 String[] labels = withClusterId("client_raw_delete_range");
612 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
613 ConcreteBackOffer backOffer =
614 ConcreteBackOffer.newDeadlineBackOff(
615 conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE, clusterId);
616 try {
617 long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS();
618 doSendDeleteRange(backOffer, startKey, endKey, deadline);
619 RAW_REQUEST_SUCCESS.labels(labels).inc();
620 } catch (Exception e) {
621 RAW_REQUEST_FAILURE.labels(labels).inc();
622 throw e;
623 } finally {
624 requestTimer.observeDuration();
625 }
626 }
627
628 @Override
629 public synchronized void deletePrefix(ByteString key) {
630 ByteString endKey = Key.toRawKey(key).nextPrefix().toByteString();
631 deleteRange(key, endKey);
632 }
633
634 @Override
635 public TiSession getSession() {
636 return tiSession;
637 }
638
639
640
641
642
643
644 public synchronized void ingest(List<Pair<ByteString, ByteString>> list) {
645 ingest(list, null);
646 }
647
648
649
650
651
652
653
654 public synchronized void ingest(List<Pair<ByteString, ByteString>> list, Long ttl)
655 throws GrpcException {
656 if (list.isEmpty()) {
657 return;
658 }
659
660 Key min = Key.MAX;
661 Key max = Key.MIN;
662 Map<ByteString, ByteString> map = new HashMap<>(list.size());
663
664 for (Pair<ByteString, ByteString> pair : list) {
665 map.put(pair.first, pair.second);
666 Key key = Key.toRawKey(pair.first.toByteArray());
667 if (key.compareTo(min) < 0) {
668 min = key;
669 }
670 if (key.compareTo(max) > 0) {
671 max = key;
672 }
673 }
674
675 SwitchTiKVModeClient switchTiKVModeClient = tiSession.getSwitchTiKVModeClient();
676
677 try {
678
679 switchTiKVModeClient.switchTiKVToNormalMode();
680
681
682 List<byte[]> splitKeys = new ArrayList<>(2);
683 splitKeys.add(min.getBytes());
684 splitKeys.add(max.next().getBytes());
685 tiSession.splitRegionAndScatter(splitKeys);
686 tiSession.getRegionManager().invalidateAll();
687
688
689 switchTiKVModeClient.keepTiKVToImportMode();
690
691
692 List<ByteString> keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList());
693 Map<TiRegion, List<ByteString>> groupKeys =
694 groupKeysByRegion(clientBuilder.getRegionManager(), keyList, defaultBackOff());
695
696
697 for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
698 TiRegion region = entry.getKey();
699 List<ByteString> keys = entry.getValue();
700 List<Pair<ByteString, ByteString>> kvs =
701 keys.stream().map(k -> Pair.create(k, map.get(k))).collect(Collectors.toList());
702 doIngest(region, kvs, ttl);
703 }
704 } finally {
705
706 switchTiKVModeClient.stopKeepTiKVToImportMode();
707 switchTiKVModeClient.switchTiKVToNormalMode();
708 }
709 }
710
711 private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sortedList, Long ttl)
712 throws GrpcException {
713 if (sortedList.isEmpty()) {
714 return;
715 }
716
717 ByteString uuid = ByteString.copyFrom(genUUID());
718 Key minKey = Key.toRawKey(sortedList.get(0).first);
719 Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first);
720 ImporterClient importerClient =
721 new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl);
722 importerClient.write(sortedList.iterator());
723 }
724
725 private void doSendBatchPut(
726 BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl, long deadline) {
727 ExecutorCompletionService<List<Batch>> completionService =
728 new ExecutorCompletionService<>(batchPutThreadPool);
729
730 List<Future<List<Batch>>> futureList = new ArrayList<>();
731
732 Map<TiRegion, List<ByteString>> groupKeys =
733 groupKeysByRegion(clientBuilder.getRegionManager(), kvPairs.keySet(), backOffer);
734 List<Batch> batches = new ArrayList<>();
735
736 for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
737 appendBatches(
738 backOffer,
739 batches,
740 entry.getKey(),
741 entry.getValue(),
742 entry.getValue().stream().map(kvPairs::get).collect(Collectors.toList()),
743 RAW_BATCH_PUT_SIZE,
744 MAX_RAW_BATCH_LIMIT);
745 }
746 Queue<List<Batch>> taskQueue = new LinkedList<>();
747 taskQueue.offer(batches);
748
749 while (!taskQueue.isEmpty()) {
750 List<Batch> task = taskQueue.poll();
751 for (Batch batch : task) {
752 futureList.add(
753 completionService.submit(
754 () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)));
755 }
756
757 try {
758 getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
759 } catch (Exception e) {
760 for (Future<List<Batch>> future : futureList) {
761 future.cancel(true);
762 }
763 throw e;
764 }
765 }
766 }
767
768 private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
769 try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
770 client.setTimeout(conf.getRawKVBatchWriteTimeoutInMS());
771 client.rawBatchPut(backOffer, batch, ttl, atomicForCAS);
772 return new ArrayList<>();
773 } catch (final TiKVException e) {
774
775 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
776 logger.warn("ReSplitting ranges for BatchPutRequest", e);
777
778 return doSendBatchPutWithRefetchRegion(backOffer, batch);
779 }
780 }
781
782 private List<Batch> doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch batch) {
783 Map<TiRegion, List<ByteString>> groupKeys =
784 groupKeysByRegion(clientBuilder.getRegionManager(), batch.getKeys(), backOffer);
785 List<Batch> retryBatches = new ArrayList<>();
786
787 for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
788 appendBatches(
789 backOffer,
790 retryBatches,
791 entry.getKey(),
792 entry.getValue(),
793 entry.getValue().stream().map(batch.getMap()::get).collect(Collectors.toList()),
794 RAW_BATCH_PUT_SIZE,
795 MAX_RAW_BATCH_LIMIT);
796 }
797
798 return retryBatches;
799 }
800
801 private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys, long deadline) {
802 ExecutorCompletionService<Pair<List<Batch>, List<KvPair>>> completionService =
803 new ExecutorCompletionService<>(batchGetThreadPool);
804
805 List<Future<Pair<List<Batch>, List<KvPair>>>> futureList = new ArrayList<>();
806
807 List<Batch> batches =
808 getBatches(backOffer, keys, RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder);
809
810 Queue<List<Batch>> taskQueue = new LinkedList<>();
811 List<KvPair> result = new ArrayList<>();
812 taskQueue.offer(batches);
813
814 while (!taskQueue.isEmpty()) {
815 List<Batch> task = taskQueue.poll();
816 for (Batch batch : task) {
817 futureList.add(
818 completionService.submit(
819 () -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch)));
820 }
821 try {
822 result.addAll(
823 getTasksWithOutput(
824 completionService, taskQueue, task, deadline - System.currentTimeMillis()));
825 } catch (Exception e) {
826 for (Future<Pair<List<Batch>, List<KvPair>>> future : futureList) {
827 future.cancel(true);
828 }
829 throw e;
830 }
831 }
832
833 return result;
834 }
835
836 private Pair<List<Batch>, List<KvPair>> doSendBatchGetInBatchesWithRetry(
837 BackOffer backOffer, Batch batch) {
838
839 try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
840 List<KvPair> partialResult = client.rawBatchGet(backOffer, batch.getKeys());
841 return Pair.create(new ArrayList<>(), partialResult);
842 } catch (final TiKVException e) {
843 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
844 clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
845 logger.warn("ReSplitting ranges for BatchGetRequest", e);
846
847
848 return Pair.create(doSendBatchGetWithRefetchRegion(backOffer, batch), new ArrayList<>());
849 }
850 }
851
852 private List<Batch> doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch batch) {
853 return getBatches(
854 backOffer, batch.getKeys(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
855 }
856
857 private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys, long deadline) {
858 ExecutorCompletionService<List<Batch>> completionService =
859 new ExecutorCompletionService<>(batchDeleteThreadPool);
860
861 List<Future<List<Batch>>> futureList = new ArrayList<>();
862
863 List<Batch> batches =
864 getBatches(backOffer, keys, RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder);
865
866 Queue<List<Batch>> taskQueue = new LinkedList<>();
867 taskQueue.offer(batches);
868
869 while (!taskQueue.isEmpty()) {
870 List<Batch> task = taskQueue.poll();
871 for (Batch batch : task) {
872 futureList.add(
873 completionService.submit(
874 () -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch)));
875 }
876 try {
877 getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
878 } catch (Exception e) {
879 for (Future<List<Batch>> future : futureList) {
880 future.cancel(true);
881 }
882 throw e;
883 }
884 }
885 }
886
887 private List<Batch> doSendBatchDeleteInBatchesWithRetry(BackOffer backOffer, Batch batch) {
888 try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
889 client.rawBatchDelete(backOffer, batch.getKeys(), atomicForCAS);
890 return new ArrayList<>();
891 } catch (final TiKVException e) {
892 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
893 clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
894 logger.warn("ReSplitting ranges for BatchGetRequest", e);
895
896
897 return doSendBatchDeleteWithRefetchRegion(backOffer, batch);
898 }
899 }
900
901 private List<Batch> doSendBatchDeleteWithRefetchRegion(BackOffer backOffer, Batch batch) {
902 return getBatches(
903 backOffer, batch.getKeys(), RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
904 }
905
906 private ByteString calcKeyByCondition(boolean condition, ByteString key1, ByteString key2) {
907 if (condition) {
908 return key1;
909 }
910 return key2;
911 }
912
913 private void doSendDeleteRange(
914 BackOffer backOffer, ByteString startKey, ByteString endKey, long deadline) {
915 ExecutorCompletionService<List<DeleteRange>> completionService =
916 new ExecutorCompletionService<>(deleteRangeThreadPool);
917
918 List<Future<List<DeleteRange>>> futureList = new ArrayList<>();
919
920 List<TiRegion> regions = fetchRegionsFromRange(backOffer, startKey, endKey);
921 List<DeleteRange> ranges = new ArrayList<>();
922 for (int i = 0; i < regions.size(); i++) {
923 TiRegion region = regions.get(i);
924 ByteString start = calcKeyByCondition(i == 0, startKey, region.getStartKey());
925 ByteString end = calcKeyByCondition(i == regions.size() - 1, endKey, region.getEndKey());
926 ranges.add(new DeleteRange(backOffer, region, start, end));
927 }
928 Queue<List<DeleteRange>> taskQueue = new LinkedList<>();
929 taskQueue.offer(ranges);
930 while (!taskQueue.isEmpty()) {
931 List<DeleteRange> task = taskQueue.poll();
932 for (DeleteRange range : task) {
933 futureList.add(
934 completionService.submit(
935 () -> doSendDeleteRangeWithRetry(range.getBackOffer(), range)));
936 }
937 try {
938 getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
939 } catch (Exception e) {
940 for (Future<List<DeleteRange>> future : futureList) {
941 future.cancel(true);
942 }
943 throw e;
944 }
945 }
946 }
947
948 private List<DeleteRange> doSendDeleteRangeWithRetry(BackOffer backOffer, DeleteRange range) {
949 try (RegionStoreClient client = clientBuilder.build(range.getRegion(), backOffer)) {
950 client.setTimeout(conf.getScanTimeout());
951 client.rawDeleteRange(backOffer, range.getStartKey(), range.getEndKey());
952 return new ArrayList<>();
953 } catch (final TiKVException e) {
954 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
955 clientBuilder.getRegionManager().invalidateRegion(range.getRegion());
956 logger.warn("ReSplitting ranges for BatchDeleteRangeRequest", e);
957
958
959 return doSendDeleteRangeWithRefetchRegion(backOffer, range);
960 }
961 }
962
963 private List<DeleteRange> doSendDeleteRangeWithRefetchRegion(
964 BackOffer backOffer, DeleteRange range) {
965 List<TiRegion> regions =
966 fetchRegionsFromRange(backOffer, range.getStartKey(), range.getEndKey());
967 List<DeleteRange> retryRanges = new ArrayList<>();
968 for (int i = 0; i < regions.size(); i++) {
969 TiRegion region = regions.get(i);
970 ByteString start = calcKeyByCondition(i == 0, range.getStartKey(), region.getStartKey());
971 ByteString end =
972 calcKeyByCondition(i == regions.size() - 1, range.getEndKey(), region.getEndKey());
973 retryRanges.add(new DeleteRange(backOffer, region, start, end));
974 }
975 return retryRanges;
976 }
977
978 private static Map<ByteString, ByteString> mapKeysToValues(
979 List<ByteString> keys, List<ByteString> values) {
980 Map<ByteString, ByteString> map = new HashMap<>();
981 for (int i = 0; i < keys.size(); i++) {
982 map.put(keys.get(i), values.get(i));
983 }
984 return map;
985 }
986
987 private List<TiRegion> fetchRegionsFromRange(
988 BackOffer backOffer, ByteString startKey, ByteString endKey) {
989 List<TiRegion> regions = new ArrayList<>();
990 while (startKey.isEmpty()
991 || endKey.isEmpty()
992 || Key.toRawKey(startKey).compareTo(Key.toRawKey(endKey)) < 0) {
993 TiRegion currentRegion = clientBuilder.getRegionManager().getRegionByKey(startKey, backOffer);
994 regions.add(currentRegion);
995 startKey = currentRegion.getEndKey();
996 if (currentRegion.getEndKey().isEmpty()) {
997 break;
998 }
999 }
1000 return regions;
1001 }
1002
1003 private Iterator<KvPair> rawScanIterator(
1004 TiConfiguration conf,
1005 RegionStoreClientBuilder builder,
1006 ByteString startKey,
1007 ByteString endKey,
1008 int limit,
1009 boolean keyOnly,
1010 BackOffer backOffer) {
1011 if (limit > MAX_RAW_SCAN_LIMIT) {
1012 throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
1013 }
1014 return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer);
1015 }
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025 public Iterator<KvPair> scan0(ByteString startKey, ByteString endKey, int limit) {
1026 return scan0(startKey, endKey, limit, false);
1027 }
1028
1029
1030
1031
1032
1033
1034
1035
1036 public Iterator<KvPair> scan0(ByteString startKey, int limit) {
1037 return scan0(startKey, limit, false);
1038 }
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048 public Iterator<KvPair> scan0(ByteString startKey, int limit, boolean keyOnly) {
1049 return scan0(startKey, ByteString.EMPTY, limit, keyOnly);
1050 }
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061 public Iterator<KvPair> scan0(
1062 ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
1063 String[] labels = withClusterId("client_raw_scan");
1064 Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
1065 try {
1066 Iterator<KvPair> iterator =
1067 rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, defaultBackOff());
1068 RAW_REQUEST_SUCCESS.labels(labels).inc();
1069 return iterator;
1070 } catch (Exception e) {
1071 RAW_REQUEST_FAILURE.labels(labels).inc();
1072 throw e;
1073 } finally {
1074 requestTimer.observeDuration();
1075 }
1076 }
1077
1078
1079
1080
1081
1082
1083
1084
1085 public Iterator<KvPair> scan0(ByteString startKey, ByteString endKey) {
1086 return scan0(startKey, endKey, false);
1087 }
1088
1089 private Iterator<KvPair> scan0(ScanOption scanOption) {
1090 ByteString startKey = scanOption.getStartKey();
1091 ByteString endKey = scanOption.getEndKey();
1092 int limit = scanOption.getLimit();
1093 boolean keyOnly = scanOption.isKeyOnly();
1094 return scan0(startKey, endKey, limit, keyOnly);
1095 }
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105 public Iterator<KvPair> scanPrefix0(ByteString prefixKey, int limit, boolean keyOnly) {
1106 return scan0(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), limit, keyOnly);
1107 }
1108
1109 public Iterator<KvPair> scanPrefix0(ByteString prefixKey) {
1110 return scan0(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString());
1111 }
1112
1113 public Iterator<KvPair> scanPrefix0(ByteString prefixKey, boolean keyOnly) {
1114 return scan0(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), keyOnly);
1115 }
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125 public Iterator<KvPair> scan0(ByteString startKey, ByteString endKey, boolean keyOnly) {
1126 return new TikvIterator(startKey, endKey, keyOnly);
1127 }
1128
1129 public class TikvIterator implements Iterator<KvPair> {
1130
1131 private Iterator<KvPair> iterator;
1132
1133 private final ByteString startKey;
1134 private final ByteString endKey;
1135 private final boolean keyOnly;
1136
1137 private KvPair last;
1138
1139 public TikvIterator(ByteString startKey, ByteString endKey, boolean keyOnly) {
1140 this.startKey = startKey;
1141 this.endKey = endKey;
1142 this.keyOnly = keyOnly;
1143
1144 this.iterator =
1145 rawScanIterator(
1146 conf,
1147 clientBuilder,
1148 this.startKey,
1149 this.endKey,
1150 conf.getScanBatchSize(),
1151 keyOnly,
1152 defaultBackOff());
1153 }
1154
1155 @Override
1156 public boolean hasNext() {
1157 if (this.iterator.hasNext()) {
1158 return true;
1159 }
1160 if (this.last == null) {
1161 return false;
1162 }
1163 ByteString startKey = Key.toRawKey(this.last.getKey()).next().toByteString();
1164 this.iterator =
1165 rawScanIterator(
1166 conf,
1167 clientBuilder,
1168 startKey,
1169 endKey,
1170 conf.getScanBatchSize(),
1171 keyOnly,
1172 defaultBackOff());
1173 this.last = null;
1174 return this.iterator.hasNext();
1175 }
1176
1177 @Override
1178 public KvPair next() {
1179 KvPair next = this.iterator.next();
1180 this.last = next;
1181 return next;
1182 }
1183 }
1184
1185 private BackOffer defaultBackOff() {
1186 return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS(), clusterId);
1187 }
1188 }