1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.region;
19
20 import static org.tikv.common.region.RegionStoreClient.RequestTypes.REQ_TYPE_DAG;
21 import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
22 import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLock;
23 import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLockFast;
24
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.protobuf.ByteString;
27 import com.google.protobuf.InvalidProtocolBufferException;
28 import com.pingcap.tidb.tipb.DAGRequest;
29 import com.pingcap.tidb.tipb.SelectResponse;
30 import io.grpc.ManagedChannel;
31 import io.grpc.Metadata;
32 import io.grpc.stub.MetadataUtils;
33 import io.prometheus.client.Histogram;
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.HashSet;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Objects;
42 import java.util.Optional;
43 import java.util.Queue;
44 import java.util.Set;
45 import java.util.function.Supplier;
46 import java.util.stream.Collectors;
47 import java.util.stream.StreamSupport;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.tikv.common.PDClient;
51 import org.tikv.common.StoreVersion;
52 import org.tikv.common.TiConfiguration;
53 import org.tikv.common.Version;
54 import org.tikv.common.exception.GrpcException;
55 import org.tikv.common.exception.KeyException;
56 import org.tikv.common.exception.RawCASConflictException;
57 import org.tikv.common.exception.RegionException;
58 import org.tikv.common.exception.SelectException;
59 import org.tikv.common.exception.TiClientInternalException;
60 import org.tikv.common.exception.TiKVException;
61 import org.tikv.common.log.SlowLogEmptyImpl;
62 import org.tikv.common.operation.KVErrorHandler;
63 import org.tikv.common.operation.RegionErrorHandler;
64 import org.tikv.common.streaming.StreamingResponse;
65 import org.tikv.common.util.BackOffFunction;
66 import org.tikv.common.util.BackOffer;
67 import org.tikv.common.util.Batch;
68 import org.tikv.common.util.ChannelFactory;
69 import org.tikv.common.util.ConcreteBackOffer;
70 import org.tikv.common.util.HistogramUtils;
71 import org.tikv.common.util.Pair;
72 import org.tikv.common.util.RangeSplitter;
73 import org.tikv.kvproto.Coprocessor;
74 import org.tikv.kvproto.Errorpb;
75 import org.tikv.kvproto.Kvrpcpb.BatchGetRequest;
76 import org.tikv.kvproto.Kvrpcpb.BatchGetResponse;
77 import org.tikv.kvproto.Kvrpcpb.CommitRequest;
78 import org.tikv.kvproto.Kvrpcpb.CommitResponse;
79 import org.tikv.kvproto.Kvrpcpb.GetRequest;
80 import org.tikv.kvproto.Kvrpcpb.GetResponse;
81 import org.tikv.kvproto.Kvrpcpb.KeyError;
82 import org.tikv.kvproto.Kvrpcpb.KvPair;
83 import org.tikv.kvproto.Kvrpcpb.Mutation;
84 import org.tikv.kvproto.Kvrpcpb.PrewriteRequest;
85 import org.tikv.kvproto.Kvrpcpb.PrewriteResponse;
86 import org.tikv.kvproto.Kvrpcpb.RawBatchDeleteRequest;
87 import org.tikv.kvproto.Kvrpcpb.RawBatchDeleteResponse;
88 import org.tikv.kvproto.Kvrpcpb.RawBatchGetRequest;
89 import org.tikv.kvproto.Kvrpcpb.RawBatchGetResponse;
90 import org.tikv.kvproto.Kvrpcpb.RawBatchPutRequest;
91 import org.tikv.kvproto.Kvrpcpb.RawBatchPutResponse;
92 import org.tikv.kvproto.Kvrpcpb.RawCASRequest;
93 import org.tikv.kvproto.Kvrpcpb.RawCASResponse;
94 import org.tikv.kvproto.Kvrpcpb.RawDeleteRangeRequest;
95 import org.tikv.kvproto.Kvrpcpb.RawDeleteRangeResponse;
96 import org.tikv.kvproto.Kvrpcpb.RawDeleteRequest;
97 import org.tikv.kvproto.Kvrpcpb.RawDeleteResponse;
98 import org.tikv.kvproto.Kvrpcpb.RawGetKeyTTLRequest;
99 import org.tikv.kvproto.Kvrpcpb.RawGetKeyTTLResponse;
100 import org.tikv.kvproto.Kvrpcpb.RawGetRequest;
101 import org.tikv.kvproto.Kvrpcpb.RawGetResponse;
102 import org.tikv.kvproto.Kvrpcpb.RawPutRequest;
103 import org.tikv.kvproto.Kvrpcpb.RawPutResponse;
104 import org.tikv.kvproto.Kvrpcpb.RawScanRequest;
105 import org.tikv.kvproto.Kvrpcpb.RawScanResponse;
106 import org.tikv.kvproto.Kvrpcpb.ScanRequest;
107 import org.tikv.kvproto.Kvrpcpb.ScanResponse;
108 import org.tikv.kvproto.Kvrpcpb.SplitRegionRequest;
109 import org.tikv.kvproto.Kvrpcpb.SplitRegionResponse;
110 import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest;
111 import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse;
112 import org.tikv.kvproto.Metapb;
113 import org.tikv.kvproto.TikvGrpc;
114 import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
115 import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
116 import org.tikv.txn.AbstractLockResolverClient;
117 import org.tikv.txn.Lock;
118 import org.tikv.txn.ResolveLockResult;
119 import org.tikv.txn.exception.LockException;
120
121
122
123
124
125
126
127
128
129 public class RegionStoreClient extends AbstractRegionStoreClient {
130 private static final Logger logger = LoggerFactory.getLogger(RegionStoreClient.class);
131 @VisibleForTesting public final AbstractLockResolverClient lockResolverClient;
132 private final TiStoreType storeType;
133
134 private final Map<Long, Set<Long>> resolvedLocks = new HashMap<>();
135
136 private final PDClient pdClient;
137 private Boolean isV4 = null;
138
139 public static final Histogram GRPC_RAW_REQUEST_LATENCY =
140 HistogramUtils.buildDuration()
141 .name("client_java_grpc_raw_requests_latency")
142 .help("grpc raw request latency.")
143 .labelNames("type", "cluster")
144 .register();
145
146 private synchronized Boolean getIsV4() {
147 if (isV4 == null) {
148 isV4 = StoreVersion.minTiKVVersion(Version.RESOLVE_LOCK_V4, pdClient);
149 }
150 return isV4;
151 }
152
153 private RegionStoreClient(
154 TiConfiguration conf,
155 TiRegion region,
156 TiStore store,
157 TiStoreType storeType,
158 ChannelFactory channelFactory,
159 TikvBlockingStub blockingStub,
160 TikvFutureStub asyncStub,
161 RegionManager regionManager,
162 PDClient pdClient,
163 RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
164 super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
165 this.storeType = storeType;
166
167 if (this.storeType == TiStoreType.TiKV) {
168 this.lockResolverClient =
169 AbstractLockResolverClient.getInstance(
170 conf,
171 region,
172 store,
173 this.blockingStub,
174 this.asyncStub,
175 channelFactory,
176 regionManager,
177 pdClient,
178 clientBuilder);
179
180 } else {
181 TiStore tikvStore =
182 regionManager.getRegionStorePairByKey(region.getStartKey(), TiStoreType.TiKV).second;
183
184 String addressStr = tikvStore.getStore().getAddress();
185 if (logger.isDebugEnabled()) {
186 logger.debug(String.format("Create region store client on address %s", addressStr));
187 }
188 ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
189
190 TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel);
191 TikvGrpc.TikvFutureStub tikvAsyncStub = TikvGrpc.newFutureStub(channel);
192
193 this.lockResolverClient =
194 AbstractLockResolverClient.getInstance(
195 conf,
196 region,
197 tikvStore,
198 tikvBlockingStub,
199 tikvAsyncStub,
200 channelFactory,
201 regionManager,
202 pdClient,
203 clientBuilder);
204 }
205 this.pdClient = pdClient;
206 }
207
208 public synchronized boolean addResolvedLocks(Long version, Set<Long> locks) {
209 Set<Long> oldList = resolvedLocks.get(version);
210 if (oldList != null) {
211 oldList.addAll(locks);
212 } else {
213 resolvedLocks.put(version, new HashSet<>(locks));
214 }
215 return true;
216 }
217
218 public synchronized Set<Long> getResolvedLocks(Long version) {
219 return resolvedLocks.getOrDefault(version, java.util.Collections.emptySet());
220 }
221
222
223
224
225
226
227
228
229
230
231
232 public ByteString get(BackOffer backOffer, ByteString key, long version)
233 throws TiClientInternalException, KeyException {
234 boolean forWrite = false;
235 Supplier<GetRequest> factory =
236 () ->
237 GetRequest.newBuilder()
238 .setContext(
239 makeContext(getResolvedLocks(version), this.storeType, backOffer.getSlowLog()))
240 .setKey(codec.encodeKey(key))
241 .setVersion(version)
242 .build();
243
244 KVErrorHandler<GetResponse> handler =
245 new KVErrorHandler<>(
246 regionManager,
247 this,
248 lockResolverClient,
249 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
250 resp -> resp.hasError() ? resp.getError() : null,
251 resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
252 version,
253 forWrite);
254
255 GetResponse resp = callWithRetry(backOffer, TikvGrpc.getKvGetMethod(), factory, handler);
256
257 handleGetResponse(resp);
258 return resp.getValue();
259 }
260
261
262
263
264
265
266 private void handleGetResponse(GetResponse resp) throws TiClientInternalException, KeyException {
267 if (resp == null) {
268 this.regionManager.onRequestFail(region);
269 throw new TiClientInternalException("GetResponse failed without a cause");
270 }
271 if (resp.hasRegionError()) {
272 throw new RegionException(resp.getRegionError());
273 }
274 if (resp.hasError()) {
275 throw new KeyException(resp.getError());
276 }
277 }
278
279 public List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long version) {
280 boolean forWrite = false;
281 Supplier<BatchGetRequest> request =
282 () ->
283 BatchGetRequest.newBuilder()
284 .setContext(
285 makeContext(getResolvedLocks(version), this.storeType, backOffer.getSlowLog()))
286 .addAllKeys(codec.encodeKeys(keys))
287 .setVersion(version)
288 .build();
289 KVErrorHandler<BatchGetResponse> handler =
290 new KVErrorHandler<>(
291 regionManager,
292 this,
293 lockResolverClient,
294 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
295 resp -> null,
296 resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
297 version,
298 forWrite);
299 BatchGetResponse resp =
300 callWithRetry(backOffer, TikvGrpc.getKvBatchGetMethod(), request, handler);
301 return handleBatchGetResponse(backOffer, resp, version);
302 }
303
304 private List<KvPair> handleBatchGetResponse(
305 BackOffer backOffer, BatchGetResponse resp, long version) {
306 boolean forWrite = false;
307 if (resp == null) {
308 this.regionManager.onRequestFail(region);
309 throw new TiClientInternalException("BatchGetResponse failed without a cause");
310 }
311 if (resp.hasRegionError()) {
312 throw new RegionException(resp.getRegionError());
313 }
314 List<Lock> locks = new ArrayList<>();
315
316 for (KvPair pair : resp.getPairsList()) {
317 if (pair.hasError()) {
318 if (pair.getError().hasLocked()) {
319 Lock lock = new Lock(pair.getError().getLocked(), codec);
320 locks.add(lock);
321 } else {
322 throw new KeyException(pair.getError());
323 }
324 }
325 }
326
327 if (!locks.isEmpty()) {
328 ResolveLockResult resolveLockResult =
329 lockResolverClient.resolveLocks(backOffer, version, locks, forWrite);
330 addResolvedLocks(version, resolveLockResult.getResolvedLocks());
331
332 throw new TiKVException("locks not resolved, retry");
333 }
334
335 return codec.decodeKvPairs(resp.getPairsList());
336 }
337
338 public List<KvPair> scan(
339 BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) {
340 boolean forWrite = false;
341 while (true) {
342 Supplier<ScanRequest> request =
343 () ->
344 ScanRequest.newBuilder()
345 .setContext(
346 makeContext(
347 getResolvedLocks(version), this.storeType, backOffer.getSlowLog()))
348 .setStartKey(codec.encodeKey(startKey))
349 .setVersion(version)
350 .setKeyOnly(keyOnly)
351 .setLimit(getConf().getScanBatchSize())
352 .build();
353
354 KVErrorHandler<ScanResponse> handler =
355 new KVErrorHandler<>(
356 regionManager,
357 this,
358 lockResolverClient,
359 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
360 resp -> resp.hasError() ? resp.getError() : null,
361 resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
362 version,
363 forWrite);
364 ScanResponse resp = callWithRetry(backOffer, TikvGrpc.getKvScanMethod(), request, handler);
365
366
367 region = regionManager.getRegionByKey(startKey, backOffer);
368
369 if (handleScanResponse(backOffer, resp, version, forWrite)) {
370 return resp.getPairsList();
371 }
372 }
373 }
374
375 private boolean handleScanResponse(
376 BackOffer backOffer, ScanResponse resp, long version, boolean forWrite) {
377 if (resp == null) {
378 this.regionManager.onRequestFail(region);
379 throw new TiClientInternalException("ScanResponse failed without a cause");
380 }
381 if (resp.hasRegionError()) {
382 backOffer.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
383 return false;
384 }
385
386
387
388
389
390
391 List<Lock> locks = new ArrayList<>();
392 for (KvPair kvPair : resp.getPairsList()) {
393 if (kvPair.hasError()) {
394 Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError(), codec);
395 locks.add(lock);
396 }
397 }
398 if (!locks.isEmpty()) {
399 ResolveLockResult resolveLockResult =
400 lockResolverClient.resolveLocks(backOffer, version, locks, forWrite);
401 addResolvedLocks(version, resolveLockResult.getResolvedLocks());
402
403 long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
404 if (msBeforeExpired > 0) {
405
406 backOffer.doBackOffWithMaxSleep(
407 BoTxnLockFast, msBeforeExpired, new KeyException(locks.toString()));
408 }
409
410 return false;
411 }
412
413 return true;
414 }
415
416 public List<KvPair> scan(BackOffer backOffer, ByteString startKey, long version) {
417 return scan(backOffer, startKey, version, false);
418 }
419
420
421
422
423
424
425
426
427
428
429
430
431
432 public void prewrite(
433 BackOffer backOffer, ByteString primary, List<Mutation> mutations, long startTs, long lockTTL)
434 throws TiClientInternalException, KeyException, RegionException {
435 this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false);
436 }
437
438
439
440
441
442
443 public void prewrite(
444 BackOffer bo,
445 ByteString primaryLock,
446 List<Mutation> mutations,
447 long startTs,
448 long ttl,
449 boolean skipConstraintCheck)
450 throws TiClientInternalException, KeyException, RegionException {
451 boolean forWrite = true;
452 while (true) {
453 Supplier<PrewriteRequest> factory =
454 () ->
455 getIsV4()
456 ? PrewriteRequest.newBuilder()
457 .setContext(makeContext(storeType, bo.getSlowLog()))
458 .setStartVersion(startTs)
459 .setPrimaryLock(codec.encodeKey(primaryLock))
460 .addAllMutations(codec.encodeMutations(mutations))
461 .setLockTtl(ttl)
462 .setSkipConstraintCheck(skipConstraintCheck)
463 .setMinCommitTs(startTs)
464 .setTxnSize(16)
465 .build()
466 : PrewriteRequest.newBuilder()
467 .setContext(makeContext(storeType, bo.getSlowLog()))
468 .setStartVersion(startTs)
469 .setPrimaryLock(primaryLock)
470 .addAllMutations(mutations)
471 .setLockTtl(ttl)
472 .setSkipConstraintCheck(skipConstraintCheck)
473
474 .setTxnSize(16)
475 .build();
476 KVErrorHandler<PrewriteResponse> handler =
477 new KVErrorHandler<>(
478 regionManager,
479 this,
480 lockResolverClient,
481 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
482 resp -> null,
483 resolveLockResult -> null,
484 startTs,
485 forWrite);
486 PrewriteResponse resp = callWithRetry(bo, TikvGrpc.getKvPrewriteMethod(), factory, handler);
487 if (isPrewriteSuccess(bo, resp, startTs)) {
488 return;
489 }
490 }
491 }
492
493
494
495
496
497
498
499
500
501
502
503 private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp, long startTs)
504 throws TiClientInternalException, KeyException, RegionException {
505 boolean forWrite = true;
506 if (resp == null) {
507 this.regionManager.onRequestFail(region);
508 throw new TiClientInternalException("Prewrite Response failed without a cause");
509 }
510 if (resp.hasRegionError()) {
511 throw new RegionException(resp.getRegionError());
512 }
513
514 boolean isSuccess = true;
515 List<Lock> locks = new ArrayList<>();
516 for (KeyError err : resp.getErrorsList()) {
517 if (err.hasLocked()) {
518 isSuccess = false;
519 Lock lock = new Lock(err.getLocked(), codec);
520 locks.add(lock);
521 } else {
522 throw new KeyException(err.toString());
523 }
524 }
525 if (isSuccess) {
526 return true;
527 }
528
529 ResolveLockResult resolveLockResult =
530 lockResolverClient.resolveLocks(backOffer, startTs, locks, forWrite);
531 addResolvedLocks(startTs, resolveLockResult.getResolvedLocks());
532 long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
533 if (msBeforeExpired > 0) {
534 backOffer.doBackOffWithMaxSleep(
535 BoTxnLock, msBeforeExpired, new KeyException(resp.getErrorsList().get(0)));
536 }
537 return false;
538 }
539
540
541 public void txnHeartBeat(BackOffer bo, ByteString primaryLock, long startTs, long ttl) {
542 boolean forWrite = false;
543 while (true) {
544 Supplier<TxnHeartBeatRequest> factory =
545 () ->
546 TxnHeartBeatRequest.newBuilder()
547 .setContext(makeContext(storeType, bo.getSlowLog()))
548 .setStartVersion(startTs)
549 .setPrimaryLock(primaryLock)
550 .setAdviseLockTtl(ttl)
551 .build();
552 KVErrorHandler<TxnHeartBeatResponse> handler =
553 new KVErrorHandler<>(
554 regionManager,
555 this,
556 lockResolverClient,
557 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
558 resp -> resp.hasError() ? resp.getError() : null,
559 resolveLockResult -> null,
560 startTs,
561 forWrite);
562 TxnHeartBeatResponse resp =
563 callWithRetry(bo, TikvGrpc.getKvTxnHeartBeatMethod(), factory, handler);
564 if (isTxnHeartBeatSuccess(resp)) {
565 return;
566 }
567 }
568 }
569
570 private boolean isTxnHeartBeatSuccess(TxnHeartBeatResponse resp)
571 throws TiClientInternalException, RegionException {
572 if (resp == null) {
573 this.regionManager.onRequestFail(region);
574 throw new TiClientInternalException("TxnHeartBeat Response failed without a cause");
575 }
576
577 if (resp.hasRegionError()) {
578 throw new RegionException(resp.getRegionError());
579 }
580
581 if (resp.hasError()) {
582 throw new TiClientInternalException("TxnHeartBeat fail, " + resp.getError().getAbort());
583 }
584
585 return true;
586 }
587
588
589
590
591
592
593
594
595
596 public void commit(BackOffer backOffer, Iterable<ByteString> keys, long startTs, long commitTs)
597 throws KeyException {
598 boolean forWrite = true;
599 Supplier<CommitRequest> factory =
600 () ->
601 CommitRequest.newBuilder()
602 .setStartVersion(startTs)
603 .setCommitVersion(commitTs)
604 .addAllKeys(
605 StreamSupport.stream(keys.spliterator(), false)
606 .map(codec::encodeKey)
607 .collect(Collectors.toList()))
608 .setContext(makeContext(storeType, backOffer.getSlowLog()))
609 .build();
610 KVErrorHandler<CommitResponse> handler =
611 new KVErrorHandler<>(
612 regionManager,
613 this,
614 lockResolverClient,
615 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
616 resp -> resp.hasError() ? resp.getError() : null,
617 resolveLockResult -> null,
618 startTs,
619 forWrite);
620 CommitResponse resp = callWithRetry(backOffer, TikvGrpc.getKvCommitMethod(), factory, handler);
621 handleCommitResponse(resp);
622 }
623
624
625
626
627
628
629
630 private void handleCommitResponse(CommitResponse resp)
631 throws TiClientInternalException, RegionException, KeyException {
632 if (resp == null) {
633 this.regionManager.onRequestFail(region);
634 throw new TiClientInternalException("CommitResponse failed without a cause");
635 }
636 if (resp.hasRegionError()) {
637
638
639
640 throw new RegionException(resp.getRegionError());
641 }
642
643 if (resp.hasError()) {
644 throw new KeyException(resp.getError());
645 }
646 }
647
648
649
650
651
652
653
654
655 public List<RangeSplitter.RegionTask> coprocess(
656 BackOffer backOffer,
657 DAGRequest req,
658 List<Coprocessor.KeyRange> ranges,
659 Queue<SelectResponse> responseQueue,
660 long startTs) {
661 boolean forWrite = false;
662 if (req == null || ranges == null || req.getExecutorsCount() < 1) {
663 throw new IllegalArgumentException("Invalid coprocessor argument!");
664 }
665
666 Supplier<Coprocessor.Request> reqToSend =
667 () ->
668 Coprocessor.Request.newBuilder()
669 .setContext(
670 makeContext(getResolvedLocks(startTs), this.storeType, backOffer.getSlowLog()))
671 .setTp(REQ_TYPE_DAG.getValue())
672 .setStartTs(startTs)
673 .setData(req.toByteString())
674 .addAllRanges(ranges)
675 .build();
676
677
678 KVErrorHandler<Coprocessor.Response> handler =
679 new KVErrorHandler<>(
680 regionManager,
681 this,
682 lockResolverClient,
683 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
684 resp -> null,
685 resolveLockResult -> addResolvedLocks(startTs, resolveLockResult.getResolvedLocks()),
686 startTs,
687 forWrite);
688 Coprocessor.Response resp =
689 callWithRetry(backOffer, TikvGrpc.getCoprocessorMethod(), reqToSend, handler);
690 return handleCopResponse(backOffer, resp, ranges, responseQueue, startTs);
691 }
692
693
694
695
696
697 private List<RangeSplitter.RegionTask> handleCopResponse(
698 BackOffer backOffer,
699 Coprocessor.Response response,
700 List<Coprocessor.KeyRange> ranges,
701 Queue<SelectResponse> responseQueue,
702 long startTs) {
703 boolean forWrite = false;
704 if (response == null) {
705
706
707
708 backOffer.doBackOff(
709 BackOffFunction.BackOffFuncType.BoRegionMiss,
710 new GrpcException("TiKV down or Network partition"));
711 logger.warn("Re-splitting region task due to region error: TiKV down or Network partition");
712
713 return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, storeType);
714 }
715
716 if (response.hasRegionError()) {
717 Errorpb.Error regionError = response.getRegionError();
718 backOffer.doBackOff(
719 BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(regionError.toString()));
720 logger.warn("Re-splitting region task due to region error:" + regionError.getMessage());
721
722 return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, storeType);
723 }
724
725 if (response.hasLocked()) {
726 Lock lock = new Lock(response.getLocked(), codec);
727 logger.debug(String.format("coprocessor encounters locks: %s", lock));
728 ResolveLockResult resolveLockResult =
729 lockResolverClient.resolveLocks(
730 backOffer, startTs, Collections.singletonList(lock), forWrite);
731 addResolvedLocks(startTs, resolveLockResult.getResolvedLocks());
732 long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
733 if (msBeforeExpired > 0) {
734 backOffer.doBackOffWithMaxSleep(BoTxnLockFast, msBeforeExpired, new LockException(lock));
735 }
736
737 return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, storeType);
738 }
739
740 String otherError = response.getOtherError();
741 if (!otherError.isEmpty()) {
742 logger.warn(String.format("Other error occurred, message: %s", otherError));
743 throw new GrpcException(otherError);
744 }
745
746 responseQueue.offer(doCoprocessor(response));
747 return null;
748 }
749
750 private Iterator<SelectResponse> doCoprocessor(StreamingResponse response) {
751 Iterator<Coprocessor.Response> responseIterator = response.iterator();
752
753 if (!responseIterator.hasNext()) {
754 return null;
755 }
756
757
758 return new Iterator<SelectResponse>() {
759 @Override
760 public boolean hasNext() {
761 return responseIterator.hasNext();
762 }
763
764 @Override
765 public SelectResponse next() {
766 return doCoprocessor(responseIterator.next());
767 }
768 };
769 }
770
771 private SelectResponse doCoprocessor(Coprocessor.Response resp) {
772 try {
773 SelectResponse selectResp = SelectResponse.parseFrom(resp.getData());
774 if (selectResp.hasError()) {
775 throw new SelectException(selectResp.getError(), selectResp.getError().getMsg());
776 }
777 return selectResp;
778 } catch (InvalidProtocolBufferException e) {
779 throw new TiClientInternalException("Error parsing protobuf for coprocessor response.", e);
780 }
781 }
782
783
784
785
786
787 public Iterator<SelectResponse> coprocessStreaming(
788 DAGRequest req, List<Coprocessor.KeyRange> ranges, long startTs) {
789 boolean forWrite = false;
790 Supplier<Coprocessor.Request> reqToSend =
791 () ->
792 Coprocessor.Request.newBuilder()
793 .setContext(
794 makeContext(
795 getResolvedLocks(startTs), this.storeType, SlowLogEmptyImpl.INSTANCE))
796
797 .setTp(REQ_TYPE_DAG.getValue())
798 .setData(req.toByteString())
799 .addAllRanges(ranges)
800 .build();
801
802 KVErrorHandler<StreamingResponse> handler =
803 new KVErrorHandler<>(
804 regionManager,
805 this,
806 lockResolverClient,
807 StreamingResponse::getFirstRegionError,
808 resp -> null,
809 resolveLockResult -> addResolvedLocks(startTs, resolveLockResult.getResolvedLocks()),
810 startTs,
811 forWrite);
812
813 StreamingResponse responseIterator =
814 this.callServerStreamingWithRetry(
815 ConcreteBackOffer.newCopNextMaxBackOff(pdClient.getClusterId()),
816 TikvGrpc.getCoprocessorStreamMethod(),
817 reqToSend,
818 handler);
819 return doCoprocessor(responseIterator);
820 }
821
822
823
824
825
826
827
828
829 public List<Metapb.Region> splitRegion(List<ByteString> splitKeys) {
830 Supplier<SplitRegionRequest> request =
831 () ->
832 SplitRegionRequest.newBuilder()
833 .setContext(makeContext(storeType, SlowLogEmptyImpl.INSTANCE))
834 .addAllSplitKeys(codec.encodeKeys(splitKeys))
835 .setIsRawKv(conf.isRawKVMode())
836 .build();
837
838 KVErrorHandler<SplitRegionResponse> handler =
839 new KVErrorHandler<>(
840 regionManager,
841 this,
842 null,
843 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
844 resp -> null,
845 resolveLockResult -> null,
846 0L,
847 false);
848
849 SplitRegionResponse resp =
850 callWithRetry(
851 ConcreteBackOffer.newGetBackOff(pdClient.getClusterId()),
852 TikvGrpc.getSplitRegionMethod(),
853 request,
854 handler);
855
856 if (resp == null) {
857 this.regionManager.onRequestFail(region);
858 throw new TiClientInternalException("SplitRegion Response failed without a cause");
859 }
860
861 if (resp.hasRegionError()) {
862 throw new TiClientInternalException(
863 String.format(
864 "failed to split region %d because %s", region.getId(), resp.getRegionError()));
865 }
866
867 if (conf.getApiVersion().isV1()) {
868 return resp.getRegionsList();
869 }
870 return resp.getRegionsList().stream().map(codec::decodeRegion).collect(Collectors.toList());
871 }
872
873
874
875 public Optional<ByteString> rawGet(BackOffer backOffer, ByteString key) {
876 Long clusterId = pdClient.getClusterId();
877 Histogram.Timer requestTimer =
878 GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get", clusterId.toString()).startTimer();
879 try {
880 Supplier<RawGetRequest> factory =
881 () ->
882 RawGetRequest.newBuilder()
883 .setContext(makeContext(storeType, backOffer.getSlowLog()))
884 .setKey(codec.encodeKey(key))
885 .build();
886 RegionErrorHandler<RawGetResponse> handler =
887 new RegionErrorHandler<RawGetResponse>(
888 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
889 RawGetResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetMethod(), factory, handler);
890 return rawGetHelper(resp);
891 } finally {
892 requestTimer.observeDuration();
893 }
894 }
895
896 private Optional<ByteString> rawGetHelper(RawGetResponse resp) {
897 if (resp == null) {
898 this.regionManager.onRequestFail(region);
899 throw new TiClientInternalException("RawGetResponse failed without a cause");
900 }
901 String error = resp.getError();
902 if (!error.isEmpty()) {
903 throw new KeyException(resp.getError());
904 }
905 if (resp.hasRegionError()) {
906 throw new RegionException(resp.getRegionError());
907 }
908 if (resp.getNotFound()) {
909 return Optional.empty();
910 } else {
911 return Optional.of(resp.getValue());
912 }
913 }
914
915 public Optional<Long> rawGetKeyTTL(BackOffer backOffer, ByteString key) {
916 Long clusterId = pdClient.getClusterId();
917 Histogram.Timer requestTimer =
918 GRPC_RAW_REQUEST_LATENCY
919 .labels("client_grpc_raw_get_key_ttl", clusterId.toString())
920 .startTimer();
921 try {
922 Supplier<RawGetKeyTTLRequest> factory =
923 () ->
924 RawGetKeyTTLRequest.newBuilder()
925 .setContext(makeContext(storeType, backOffer.getSlowLog()))
926 .setKey(codec.encodeKey(key))
927 .build();
928 RegionErrorHandler<RawGetKeyTTLResponse> handler =
929 new RegionErrorHandler<RawGetKeyTTLResponse>(
930 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
931 RawGetKeyTTLResponse resp =
932 callWithRetry(backOffer, TikvGrpc.getRawGetKeyTTLMethod(), factory, handler);
933 return rawGetKeyTTLHelper(resp);
934 } finally {
935 requestTimer.observeDuration();
936 }
937 }
938
939 private Optional<Long> rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
940 if (resp == null) {
941 this.regionManager.onRequestFail(region);
942 throw new TiClientInternalException("RawGetResponse failed without a cause");
943 }
944 String error = resp.getError();
945 if (!error.isEmpty()) {
946 throw new KeyException(resp.getError());
947 }
948 if (resp.hasRegionError()) {
949 throw new RegionException(resp.getRegionError());
950 }
951 if (resp.getNotFound()) {
952 return Optional.empty();
953 }
954 return Optional.of(resp.getTtl());
955 }
956
957 public void rawDelete(BackOffer backOffer, ByteString key, boolean atomicForCAS) {
958 Long clusterId = pdClient.getClusterId();
959 Histogram.Timer requestTimer =
960 GRPC_RAW_REQUEST_LATENCY
961 .labels("client_grpc_raw_delete", clusterId.toString())
962 .startTimer();
963 try {
964 Supplier<RawDeleteRequest> factory =
965 () ->
966 RawDeleteRequest.newBuilder()
967 .setContext(makeContext(storeType, backOffer.getSlowLog()))
968 .setKey(codec.encodeKey(key))
969 .setForCas(atomicForCAS)
970 .build();
971
972 RegionErrorHandler<RawDeleteResponse> handler =
973 new RegionErrorHandler<RawDeleteResponse>(
974 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
975 RawDeleteResponse resp =
976 callWithRetry(backOffer, TikvGrpc.getRawDeleteMethod(), factory, handler);
977 rawDeleteHelper(resp, region);
978 } finally {
979 requestTimer.observeDuration();
980 }
981 }
982
983 private void rawDeleteHelper(RawDeleteResponse resp, TiRegion region) {
984 if (resp == null) {
985 this.regionManager.onRequestFail(region);
986 throw new TiClientInternalException("RawDeleteResponse failed without a cause");
987 }
988 String error = resp.getError();
989 if (!error.isEmpty()) {
990 throw new KeyException(resp.getError());
991 }
992 if (resp.hasRegionError()) {
993 throw new RegionException(resp.getRegionError());
994 }
995 }
996
997 public void rawPut(
998 BackOffer backOffer, ByteString key, ByteString value, long ttl, boolean atomicForCAS) {
999 Long clusterId = pdClient.getClusterId();
1000 Histogram.Timer requestTimer =
1001 GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put", clusterId.toString()).startTimer();
1002 try {
1003 Supplier<RawPutRequest> factory =
1004 () ->
1005 RawPutRequest.newBuilder()
1006 .setContext(makeContext(storeType, backOffer.getSlowLog()))
1007 .setKey(codec.encodeKey(key))
1008 .setValue(value)
1009 .setTtl(ttl)
1010 .setForCas(atomicForCAS)
1011 .build();
1012
1013 RegionErrorHandler<RawPutResponse> handler =
1014 new RegionErrorHandler<RawPutResponse>(
1015 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1016 RawPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawPutMethod(), factory, handler);
1017 rawPutHelper(resp);
1018 } finally {
1019 requestTimer.observeDuration();
1020 }
1021 }
1022
1023 private void rawPutHelper(RawPutResponse resp) {
1024 if (resp == null) {
1025 this.regionManager.onRequestFail(region);
1026 throw new TiClientInternalException("RawPutResponse failed without a cause");
1027 }
1028 String error = resp.getError();
1029 if (!error.isEmpty()) {
1030 throw new KeyException(resp.getError());
1031 }
1032 if (resp.hasRegionError()) {
1033 throw new RegionException(resp.getRegionError());
1034 }
1035 }
1036
1037 public void rawCompareAndSet(
1038 BackOffer backOffer,
1039 ByteString key,
1040 Optional<ByteString> prevValue,
1041 ByteString value,
1042 long ttl)
1043 throws RawCASConflictException {
1044 Long clusterId = pdClient.getClusterId();
1045 Histogram.Timer requestTimer =
1046 GRPC_RAW_REQUEST_LATENCY
1047 .labels("client_grpc_raw_put_if_absent", clusterId.toString())
1048 .startTimer();
1049 try {
1050 Supplier<RawCASRequest> factory =
1051 () ->
1052 RawCASRequest.newBuilder()
1053 .setContext(makeContext(storeType, backOffer.getSlowLog()))
1054 .setKey(codec.encodeKey(key))
1055 .setValue(value)
1056 .setPreviousValue(prevValue.orElse(ByteString.EMPTY))
1057 .setPreviousNotExist(!prevValue.isPresent())
1058 .setTtl(ttl)
1059 .build();
1060
1061 RegionErrorHandler<RawCASResponse> handler =
1062 new RegionErrorHandler<RawCASResponse>(
1063 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1064 RawCASResponse resp =
1065 callWithRetry(backOffer, TikvGrpc.getRawCompareAndSwapMethod(), factory, handler);
1066 rawCompareAndSetHelper(key, prevValue, resp);
1067 } finally {
1068 requestTimer.observeDuration();
1069 }
1070 }
1071
1072 private void rawCompareAndSetHelper(
1073 ByteString key, Optional<ByteString> expectedPrevValue, RawCASResponse resp)
1074 throws RawCASConflictException {
1075 if (resp == null) {
1076 this.regionManager.onRequestFail(region);
1077 throw new TiClientInternalException("RawCASResponse failed without a cause");
1078 }
1079 String error = resp.getError();
1080 if (!error.isEmpty()) {
1081 throw new KeyException(resp.getError());
1082 }
1083 if (resp.hasRegionError()) {
1084 throw new RegionException(resp.getRegionError());
1085 }
1086 if (!resp.getSucceed()) {
1087 if (resp.getPreviousNotExist()) {
1088 throw new RawCASConflictException(key, expectedPrevValue, Optional.empty());
1089 } else {
1090 throw new RawCASConflictException(
1091 key, expectedPrevValue, Optional.of(resp.getPreviousValue()));
1092 }
1093 }
1094 }
1095
1096 public List<KvPair> rawBatchGet(BackOffer backoffer, List<ByteString> keys) {
1097 Long clusterId = pdClient.getClusterId();
1098 Histogram.Timer requestTimer =
1099 GRPC_RAW_REQUEST_LATENCY
1100 .labels("client_grpc_raw_batch_get", clusterId.toString())
1101 .startTimer();
1102 try {
1103 if (keys.isEmpty()) {
1104 return new ArrayList<>();
1105 }
1106 Supplier<RawBatchGetRequest> factory =
1107 () ->
1108 RawBatchGetRequest.newBuilder()
1109 .setContext(makeContext(storeType, backoffer.getSlowLog()))
1110 .addAllKeys(codec.encodeKeys(keys))
1111 .build();
1112 RegionErrorHandler<RawBatchGetResponse> handler =
1113 new RegionErrorHandler<RawBatchGetResponse>(
1114 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1115 RawBatchGetResponse resp =
1116 callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler);
1117 return handleRawBatchGet(resp);
1118 } finally {
1119 requestTimer.observeDuration();
1120 }
1121 }
1122
1123 private List<KvPair> handleRawBatchGet(RawBatchGetResponse resp) {
1124 if (resp == null) {
1125 this.regionManager.onRequestFail(region);
1126 throw new TiClientInternalException("RawBatchPutResponse failed without a cause");
1127 }
1128 if (resp.hasRegionError()) {
1129 throw new RegionException(resp.getRegionError());
1130 }
1131
1132 return codec.decodeKvPairs(resp.getPairsList());
1133 }
1134
1135 public void rawBatchPut(
1136 BackOffer backOffer, List<KvPair> kvPairs, long ttl, boolean atomicForCAS) {
1137 Long clusterId = pdClient.getClusterId();
1138 Histogram.Timer requestTimer =
1139 GRPC_RAW_REQUEST_LATENCY
1140 .labels("client_grpc_raw_batch_put", clusterId.toString())
1141 .startTimer();
1142 try {
1143 if (kvPairs.isEmpty()) {
1144 return;
1145 }
1146 Supplier<RawBatchPutRequest> factory =
1147 () ->
1148 RawBatchPutRequest.newBuilder()
1149 .setContext(makeContext(storeType, backOffer.getSlowLog()))
1150 .addAllPairs(kvPairs)
1151 .setTtl(ttl)
1152 .addTtls(ttl)
1153 .setForCas(atomicForCAS)
1154 .build();
1155 RegionErrorHandler<RawBatchPutResponse> handler =
1156 new RegionErrorHandler<RawBatchPutResponse>(
1157 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1158 RawBatchPutResponse resp =
1159 callWithRetry(backOffer, TikvGrpc.getRawBatchPutMethod(), factory, handler);
1160 handleRawBatchPut(resp);
1161 } finally {
1162 requestTimer.observeDuration();
1163 }
1164 }
1165
1166 public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atomicForCAS) {
1167 List<KvPair> pairs = new ArrayList<>();
1168 for (int i = 0; i < batch.getKeys().size(); i++) {
1169 pairs.add(
1170 KvPair.newBuilder()
1171 .setKey(codec.encodeKey(batch.getKeys().get(i)))
1172 .setValue(batch.getValues().get(i))
1173 .build());
1174 }
1175 rawBatchPut(backOffer, pairs, ttl, atomicForCAS);
1176 }
1177
1178 private void handleRawBatchPut(RawBatchPutResponse resp) {
1179 if (resp == null) {
1180 this.regionManager.onRequestFail(region);
1181 throw new TiClientInternalException("RawBatchPutResponse failed without a cause");
1182 }
1183 String error = resp.getError();
1184 if (!error.isEmpty()) {
1185 throw new KeyException(resp.getError());
1186 }
1187 if (resp.hasRegionError()) {
1188 throw new RegionException(resp.getRegionError());
1189 }
1190 }
1191
1192 public void rawBatchDelete(BackOffer backoffer, List<ByteString> keys, boolean atomicForCAS) {
1193 Long clusterId = pdClient.getClusterId();
1194 Histogram.Timer requestTimer =
1195 GRPC_RAW_REQUEST_LATENCY
1196 .labels("client_grpc_raw_batch_delete", clusterId.toString())
1197 .startTimer();
1198 try {
1199 if (keys.isEmpty()) {
1200 return;
1201 }
1202 Supplier<RawBatchDeleteRequest> factory =
1203 () ->
1204 RawBatchDeleteRequest.newBuilder()
1205 .setContext(makeContext(storeType, backoffer.getSlowLog()))
1206 .addAllKeys(codec.encodeKeys(keys))
1207 .setForCas(atomicForCAS)
1208 .build();
1209 RegionErrorHandler<RawBatchDeleteResponse> handler =
1210 new RegionErrorHandler<RawBatchDeleteResponse>(
1211 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1212 RawBatchDeleteResponse resp =
1213 callWithRetry(backoffer, TikvGrpc.getRawBatchDeleteMethod(), factory, handler);
1214 handleRawBatchDelete(resp);
1215 } finally {
1216 requestTimer.observeDuration();
1217 }
1218 }
1219
1220 private void handleRawBatchDelete(RawBatchDeleteResponse resp) {
1221 if (resp == null) {
1222 this.regionManager.onRequestFail(region);
1223 throw new TiClientInternalException("RawBatchDeleteResponse failed without a cause");
1224 }
1225 String error = resp.getError();
1226 if (!error.isEmpty()) {
1227 throw new KeyException(resp.getError());
1228 }
1229 if (resp.hasRegionError()) {
1230 throw new RegionException(resp.getRegionError());
1231 }
1232 }
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243 public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
1244 Long clusterId = pdClient.getClusterId();
1245 Histogram.Timer requestTimer =
1246 GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer();
1247 try {
1248 Supplier<RawScanRequest> factory =
1249 () -> {
1250 Pair<ByteString, ByteString> range = codec.encodeRange(key, ByteString.EMPTY);
1251 return RawScanRequest.newBuilder()
1252 .setContext(makeContext(storeType, backOffer.getSlowLog()))
1253 .setStartKey(range.first)
1254 .setEndKey(range.second)
1255 .setKeyOnly(keyOnly)
1256 .setLimit(limit)
1257 .build();
1258 };
1259
1260 RegionErrorHandler<RawScanResponse> handler =
1261 new RegionErrorHandler<RawScanResponse>(
1262 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1263 RawScanResponse resp =
1264 callWithRetry(backOffer, TikvGrpc.getRawScanMethod(), factory, handler);
1265
1266
1267 region = regionManager.getRegionByKey(key, backOffer);
1268 return rawScanHelper(resp);
1269 } finally {
1270 requestTimer.observeDuration();
1271 }
1272 }
1273
1274 public List<KvPair> rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) {
1275 return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly);
1276 }
1277
1278 private List<KvPair> rawScanHelper(RawScanResponse resp) {
1279 if (resp == null) {
1280 this.regionManager.onRequestFail(region);
1281 throw new TiClientInternalException("RawScanResponse failed without a cause");
1282 }
1283 if (resp.hasRegionError()) {
1284 throw new RegionException(resp.getRegionError());
1285 }
1286 return codec.decodeKvPairs(resp.getKvsList());
1287 }
1288
1289
1290
1291
1292
1293
1294
1295
1296 public void rawDeleteRange(BackOffer backOffer, ByteString startKey, ByteString endKey) {
1297 Long clusterId = pdClient.getClusterId();
1298 Histogram.Timer requestTimer =
1299 GRPC_RAW_REQUEST_LATENCY
1300 .labels("client_grpc_raw_delete_range", clusterId.toString())
1301 .startTimer();
1302 try {
1303 Supplier<RawDeleteRangeRequest> factory =
1304 () -> {
1305 Pair<ByteString, ByteString> range = codec.encodeRange(startKey, endKey);
1306 return RawDeleteRangeRequest.newBuilder()
1307 .setContext(makeContext(storeType, backOffer.getSlowLog()))
1308 .setStartKey(range.first)
1309 .setEndKey(range.second)
1310 .build();
1311 };
1312
1313 RegionErrorHandler<RawDeleteRangeResponse> handler =
1314 new RegionErrorHandler<RawDeleteRangeResponse>(
1315 regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1316 RawDeleteRangeResponse resp =
1317 callWithRetry(backOffer, TikvGrpc.getRawDeleteRangeMethod(), factory, handler);
1318 rawDeleteRangeHelper(resp);
1319 } finally {
1320 requestTimer.observeDuration();
1321 }
1322 }
1323
1324 private void rawDeleteRangeHelper(RawDeleteRangeResponse resp) {
1325 if (resp == null) {
1326 this.regionManager.onRequestFail(region);
1327 throw new TiClientInternalException("RawDeleteRangeResponse failed without a cause");
1328 }
1329 String error = resp.getError();
1330 if (!error.isEmpty()) {
1331 throw new KeyException(resp.getError());
1332 }
1333 if (resp.hasRegionError()) {
1334 throw new RegionException(resp.getRegionError());
1335 }
1336 }
1337
1338 public enum RequestTypes {
1339 REQ_TYPE_SELECT(101),
1340 REQ_TYPE_INDEX(102),
1341 REQ_TYPE_DAG(103),
1342 REQ_TYPE_ANALYZE(104),
1343 BATCH_ROW_COUNT(64);
1344
1345 private final int value;
1346
1347 RequestTypes(int value) {
1348 this.value = value;
1349 }
1350
1351 public int getValue() {
1352 return value;
1353 }
1354 }
1355
1356 public static class RegionStoreClientBuilder {
1357
1358 private final TiConfiguration conf;
1359 private final ChannelFactory channelFactory;
1360 private final RegionManager regionManager;
1361 private final PDClient pdClient;
1362
1363 public RegionStoreClientBuilder(
1364 TiConfiguration conf,
1365 ChannelFactory channelFactory,
1366 RegionManager regionManager,
1367 PDClient pdClient) {
1368 Objects.requireNonNull(conf, "conf is null");
1369 Objects.requireNonNull(channelFactory, "channelFactory is null");
1370 Objects.requireNonNull(regionManager, "regionManager is null");
1371 this.conf = conf;
1372 this.channelFactory = channelFactory;
1373 this.regionManager = regionManager;
1374 this.pdClient = pdClient;
1375 }
1376
1377 public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType storeType)
1378 throws GrpcException {
1379 Objects.requireNonNull(region, "region is null");
1380 Objects.requireNonNull(store, "store is null");
1381 Objects.requireNonNull(storeType, "storeType is null");
1382
1383 String addressStr = store.getStore().getAddress();
1384 if (logger.isDebugEnabled()) {
1385 logger.debug(String.format("Create region store client on address %s", addressStr));
1386 }
1387 ManagedChannel channel = null;
1388
1389 TikvBlockingStub blockingStub = null;
1390 TikvFutureStub asyncStub = null;
1391
1392 if (conf.getEnableGrpcForward() && store.getProxyStore() != null && !store.isReachable()) {
1393 addressStr = store.getProxyStore().getAddress();
1394 channel =
1395 channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
1396 Metadata header = new Metadata();
1397 header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
1398 blockingStub =
1399 TikvGrpc.newBlockingStub(channel)
1400 .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
1401 asyncStub =
1402 TikvGrpc.newFutureStub(channel)
1403 .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
1404 } else {
1405 channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
1406 blockingStub = TikvGrpc.newBlockingStub(channel);
1407 asyncStub = TikvGrpc.newFutureStub(channel);
1408 }
1409
1410 return new RegionStoreClient(
1411 conf,
1412 region,
1413 store,
1414 storeType,
1415 channelFactory,
1416 blockingStub,
1417 asyncStub,
1418 regionManager,
1419 pdClient,
1420 this);
1421 }
1422
1423 public RegionStoreClient build(TiRegion region, TiStore store) throws GrpcException {
1424 return build(region, store, TiStoreType.TiKV);
1425 }
1426
1427 public RegionStoreClient build(ByteString key) throws GrpcException {
1428 return build(key, TiStoreType.TiKV);
1429 }
1430
1431 public RegionStoreClient build(ByteString key, BackOffer backOffer) throws GrpcException {
1432 return build(key, TiStoreType.TiKV, backOffer);
1433 }
1434
1435 public RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException {
1436 return build(key, storeType, defaultBackOff());
1437 }
1438
1439 public RegionStoreClient build(ByteString key, TiStoreType storeType, BackOffer backOffer)
1440 throws GrpcException {
1441 Pair<TiRegion, TiStore> pair =
1442 regionManager.getRegionStorePairByKey(key, storeType, backOffer);
1443 return build(pair.first, pair.second, storeType);
1444 }
1445
1446 public RegionStoreClient build(TiRegion region) throws GrpcException {
1447 return build(region, defaultBackOff());
1448 }
1449
1450 public RegionStoreClient build(TiRegion region, BackOffer backOffer) throws GrpcException {
1451 TiStore store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
1452 return build(region, store, TiStoreType.TiKV);
1453 }
1454
1455 public RegionManager getRegionManager() {
1456 return regionManager;
1457 }
1458
1459 private BackOffer defaultBackOff() {
1460 BackOffer backoffer =
1461 ConcreteBackOffer.newCustomBackOff(
1462 conf.getRawKVDefaultBackoffInMS(), pdClient.getClusterId());
1463 return backoffer;
1464 }
1465 }
1466 }