View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.region;
19  
20  import static org.tikv.common.region.RegionStoreClient.RequestTypes.REQ_TYPE_DAG;
21  import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
22  import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLock;
23  import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLockFast;
24  
25  import com.google.common.annotations.VisibleForTesting;
26  import com.google.protobuf.ByteString;
27  import com.google.protobuf.InvalidProtocolBufferException;
28  import com.pingcap.tidb.tipb.DAGRequest;
29  import com.pingcap.tidb.tipb.SelectResponse;
30  import io.grpc.ManagedChannel;
31  import io.grpc.Metadata;
32  import io.grpc.stub.MetadataUtils;
33  import io.prometheus.client.Histogram;
34  import java.util.ArrayList;
35  import java.util.Collections;
36  import java.util.HashMap;
37  import java.util.HashSet;
38  import java.util.Iterator;
39  import java.util.List;
40  import java.util.Map;
41  import java.util.Objects;
42  import java.util.Optional;
43  import java.util.Queue;
44  import java.util.Set;
45  import java.util.function.Supplier;
46  import java.util.stream.Collectors;
47  import java.util.stream.StreamSupport;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  import org.tikv.common.PDClient;
51  import org.tikv.common.StoreVersion;
52  import org.tikv.common.TiConfiguration;
53  import org.tikv.common.Version;
54  import org.tikv.common.exception.GrpcException;
55  import org.tikv.common.exception.KeyException;
56  import org.tikv.common.exception.RawCASConflictException;
57  import org.tikv.common.exception.RegionException;
58  import org.tikv.common.exception.SelectException;
59  import org.tikv.common.exception.TiClientInternalException;
60  import org.tikv.common.exception.TiKVException;
61  import org.tikv.common.log.SlowLogEmptyImpl;
62  import org.tikv.common.operation.KVErrorHandler;
63  import org.tikv.common.operation.RegionErrorHandler;
64  import org.tikv.common.streaming.StreamingResponse;
65  import org.tikv.common.util.BackOffFunction;
66  import org.tikv.common.util.BackOffer;
67  import org.tikv.common.util.Batch;
68  import org.tikv.common.util.ChannelFactory;
69  import org.tikv.common.util.ConcreteBackOffer;
70  import org.tikv.common.util.HistogramUtils;
71  import org.tikv.common.util.Pair;
72  import org.tikv.common.util.RangeSplitter;
73  import org.tikv.kvproto.Coprocessor;
74  import org.tikv.kvproto.Errorpb;
75  import org.tikv.kvproto.Kvrpcpb.BatchGetRequest;
76  import org.tikv.kvproto.Kvrpcpb.BatchGetResponse;
77  import org.tikv.kvproto.Kvrpcpb.CommitRequest;
78  import org.tikv.kvproto.Kvrpcpb.CommitResponse;
79  import org.tikv.kvproto.Kvrpcpb.GetRequest;
80  import org.tikv.kvproto.Kvrpcpb.GetResponse;
81  import org.tikv.kvproto.Kvrpcpb.KeyError;
82  import org.tikv.kvproto.Kvrpcpb.KvPair;
83  import org.tikv.kvproto.Kvrpcpb.Mutation;
84  import org.tikv.kvproto.Kvrpcpb.PrewriteRequest;
85  import org.tikv.kvproto.Kvrpcpb.PrewriteResponse;
86  import org.tikv.kvproto.Kvrpcpb.RawBatchDeleteRequest;
87  import org.tikv.kvproto.Kvrpcpb.RawBatchDeleteResponse;
88  import org.tikv.kvproto.Kvrpcpb.RawBatchGetRequest;
89  import org.tikv.kvproto.Kvrpcpb.RawBatchGetResponse;
90  import org.tikv.kvproto.Kvrpcpb.RawBatchPutRequest;
91  import org.tikv.kvproto.Kvrpcpb.RawBatchPutResponse;
92  import org.tikv.kvproto.Kvrpcpb.RawCASRequest;
93  import org.tikv.kvproto.Kvrpcpb.RawCASResponse;
94  import org.tikv.kvproto.Kvrpcpb.RawDeleteRangeRequest;
95  import org.tikv.kvproto.Kvrpcpb.RawDeleteRangeResponse;
96  import org.tikv.kvproto.Kvrpcpb.RawDeleteRequest;
97  import org.tikv.kvproto.Kvrpcpb.RawDeleteResponse;
98  import org.tikv.kvproto.Kvrpcpb.RawGetKeyTTLRequest;
99  import org.tikv.kvproto.Kvrpcpb.RawGetKeyTTLResponse;
100 import org.tikv.kvproto.Kvrpcpb.RawGetRequest;
101 import org.tikv.kvproto.Kvrpcpb.RawGetResponse;
102 import org.tikv.kvproto.Kvrpcpb.RawPutRequest;
103 import org.tikv.kvproto.Kvrpcpb.RawPutResponse;
104 import org.tikv.kvproto.Kvrpcpb.RawScanRequest;
105 import org.tikv.kvproto.Kvrpcpb.RawScanResponse;
106 import org.tikv.kvproto.Kvrpcpb.ScanRequest;
107 import org.tikv.kvproto.Kvrpcpb.ScanResponse;
108 import org.tikv.kvproto.Kvrpcpb.SplitRegionRequest;
109 import org.tikv.kvproto.Kvrpcpb.SplitRegionResponse;
110 import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest;
111 import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse;
112 import org.tikv.kvproto.Metapb;
113 import org.tikv.kvproto.TikvGrpc;
114 import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
115 import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
116 import org.tikv.txn.AbstractLockResolverClient;
117 import org.tikv.txn.Lock;
118 import org.tikv.txn.ResolveLockResult;
119 import org.tikv.txn.exception.LockException;
120 
121 // TODO:
122 //  1. RegionStoreClient will be inaccessible directly.
123 //  2. All apis of RegionStoreClient would not provide retry aside from callWithRetry,
124 //  if a request needs to be retried because of an un-retryable cause, e.g., keys
125 //  need to be re-split across regions/stores, region info outdated, e.t.c., you
126 //  should retry it in an upper client logic (KVClient, TxnClient, e.t.c.)
127 
128 /** Note that RegionStoreClient itself is not thread-safe */
129 public class RegionStoreClient extends AbstractRegionStoreClient {
130   private static final Logger logger = LoggerFactory.getLogger(RegionStoreClient.class);
131   @VisibleForTesting public final AbstractLockResolverClient lockResolverClient;
132   private final TiStoreType storeType;
133   /** startTS -> List(locks) */
134   private final Map<Long, Set<Long>> resolvedLocks = new HashMap<>();
135 
136   private final PDClient pdClient;
137   private Boolean isV4 = null;
138 
139   public static final Histogram GRPC_RAW_REQUEST_LATENCY =
140       HistogramUtils.buildDuration()
141           .name("client_java_grpc_raw_requests_latency")
142           .help("grpc raw request latency.")
143           .labelNames("type", "cluster")
144           .register();
145 
146   private synchronized Boolean getIsV4() {
147     if (isV4 == null) {
148       isV4 = StoreVersion.minTiKVVersion(Version.RESOLVE_LOCK_V4, pdClient);
149     }
150     return isV4;
151   }
152 
153   private RegionStoreClient(
154       TiConfiguration conf,
155       TiRegion region,
156       TiStore store,
157       TiStoreType storeType,
158       ChannelFactory channelFactory,
159       TikvBlockingStub blockingStub,
160       TikvFutureStub asyncStub,
161       RegionManager regionManager,
162       PDClient pdClient,
163       RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
164     super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
165     this.storeType = storeType;
166 
167     if (this.storeType == TiStoreType.TiKV) {
168       this.lockResolverClient =
169           AbstractLockResolverClient.getInstance(
170               conf,
171               region,
172               store,
173               this.blockingStub,
174               this.asyncStub,
175               channelFactory,
176               regionManager,
177               pdClient,
178               clientBuilder);
179 
180     } else {
181       TiStore tikvStore =
182           regionManager.getRegionStorePairByKey(region.getStartKey(), TiStoreType.TiKV).second;
183 
184       String addressStr = tikvStore.getStore().getAddress();
185       if (logger.isDebugEnabled()) {
186         logger.debug(String.format("Create region store client on address %s", addressStr));
187       }
188       ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
189 
190       TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel);
191       TikvGrpc.TikvFutureStub tikvAsyncStub = TikvGrpc.newFutureStub(channel);
192 
193       this.lockResolverClient =
194           AbstractLockResolverClient.getInstance(
195               conf,
196               region,
197               tikvStore,
198               tikvBlockingStub,
199               tikvAsyncStub,
200               channelFactory,
201               regionManager,
202               pdClient,
203               clientBuilder);
204     }
205     this.pdClient = pdClient;
206   }
207 
208   public synchronized boolean addResolvedLocks(Long version, Set<Long> locks) {
209     Set<Long> oldList = resolvedLocks.get(version);
210     if (oldList != null) {
211       oldList.addAll(locks);
212     } else {
213       resolvedLocks.put(version, new HashSet<>(locks));
214     }
215     return true;
216   }
217 
218   public synchronized Set<Long> getResolvedLocks(Long version) {
219     return resolvedLocks.getOrDefault(version, java.util.Collections.emptySet());
220   }
221 
222   /**
223    * Fetch a value according to a key
224    *
225    * @param backOffer backOffer
226    * @param key key to fetch
227    * @param version key version
228    * @return value
229    * @throws TiClientInternalException TiSpark Client exception, unexpected
230    * @throws KeyException Key may be locked
231    */
232   public ByteString get(BackOffer backOffer, ByteString key, long version)
233       throws TiClientInternalException, KeyException {
234     boolean forWrite = false;
235     Supplier<GetRequest> factory =
236         () ->
237             GetRequest.newBuilder()
238                 .setContext(
239                     makeContext(getResolvedLocks(version), this.storeType, backOffer.getSlowLog()))
240                 .setKey(codec.encodeKey(key))
241                 .setVersion(version)
242                 .build();
243 
244     KVErrorHandler<GetResponse> handler =
245         new KVErrorHandler<>(
246             regionManager,
247             this,
248             lockResolverClient,
249             resp -> resp.hasRegionError() ? resp.getRegionError() : null,
250             resp -> resp.hasError() ? resp.getError() : null,
251             resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
252             version,
253             forWrite);
254 
255     GetResponse resp = callWithRetry(backOffer, TikvGrpc.getKvGetMethod(), factory, handler);
256 
257     handleGetResponse(resp);
258     return resp.getValue();
259   }
260 
261   /**
262    * @param resp GetResponse
263    * @throws TiClientInternalException TiSpark Client exception, unexpected
264    * @throws KeyException Key may be locked
265    */
266   private void handleGetResponse(GetResponse resp) throws TiClientInternalException, KeyException {
267     if (resp == null) {
268       this.regionManager.onRequestFail(region);
269       throw new TiClientInternalException("GetResponse failed without a cause");
270     }
271     if (resp.hasRegionError()) {
272       throw new RegionException(resp.getRegionError());
273     }
274     if (resp.hasError()) {
275       throw new KeyException(resp.getError());
276     }
277   }
278 
279   public List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long version) {
280     boolean forWrite = false;
281     Supplier<BatchGetRequest> request =
282         () ->
283             BatchGetRequest.newBuilder()
284                 .setContext(
285                     makeContext(getResolvedLocks(version), this.storeType, backOffer.getSlowLog()))
286                 .addAllKeys(codec.encodeKeys(keys))
287                 .setVersion(version)
288                 .build();
289     KVErrorHandler<BatchGetResponse> handler =
290         new KVErrorHandler<>(
291             regionManager,
292             this,
293             lockResolverClient,
294             resp -> resp.hasRegionError() ? resp.getRegionError() : null,
295             resp -> null,
296             resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
297             version,
298             forWrite);
299     BatchGetResponse resp =
300         callWithRetry(backOffer, TikvGrpc.getKvBatchGetMethod(), request, handler);
301     return handleBatchGetResponse(backOffer, resp, version);
302   }
303 
304   private List<KvPair> handleBatchGetResponse(
305       BackOffer backOffer, BatchGetResponse resp, long version) {
306     boolean forWrite = false;
307     if (resp == null) {
308       this.regionManager.onRequestFail(region);
309       throw new TiClientInternalException("BatchGetResponse failed without a cause");
310     }
311     if (resp.hasRegionError()) {
312       throw new RegionException(resp.getRegionError());
313     }
314     List<Lock> locks = new ArrayList<>();
315 
316     for (KvPair pair : resp.getPairsList()) {
317       if (pair.hasError()) {
318         if (pair.getError().hasLocked()) {
319           Lock lock = new Lock(pair.getError().getLocked(), codec);
320           locks.add(lock);
321         } else {
322           throw new KeyException(pair.getError());
323         }
324       }
325     }
326 
327     if (!locks.isEmpty()) {
328       ResolveLockResult resolveLockResult =
329           lockResolverClient.resolveLocks(backOffer, version, locks, forWrite);
330       addResolvedLocks(version, resolveLockResult.getResolvedLocks());
331       // resolveLocks already retried, just throw error to upper logic.
332       throw new TiKVException("locks not resolved, retry");
333     }
334 
335     return codec.decodeKvPairs(resp.getPairsList());
336   }
337 
338   public List<KvPair> scan(
339       BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) {
340     boolean forWrite = false;
341     while (true) {
342       Supplier<ScanRequest> request =
343           () ->
344               ScanRequest.newBuilder()
345                   .setContext(
346                       makeContext(
347                           getResolvedLocks(version), this.storeType, backOffer.getSlowLog()))
348                   .setStartKey(codec.encodeKey(startKey))
349                   .setVersion(version)
350                   .setKeyOnly(keyOnly)
351                   .setLimit(getConf().getScanBatchSize())
352                   .build();
353 
354       KVErrorHandler<ScanResponse> handler =
355           new KVErrorHandler<>(
356               regionManager,
357               this,
358               lockResolverClient,
359               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
360               resp -> resp.hasError() ? resp.getError() : null,
361               resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
362               version,
363               forWrite);
364       ScanResponse resp = callWithRetry(backOffer, TikvGrpc.getKvScanMethod(), request, handler);
365       // retry may refresh region info
366       // we need to update region after retry
367       region = regionManager.getRegionByKey(startKey, backOffer);
368 
369       if (handleScanResponse(backOffer, resp, version, forWrite)) {
370         return resp.getPairsList();
371       }
372     }
373   }
374 
375   private boolean handleScanResponse(
376       BackOffer backOffer, ScanResponse resp, long version, boolean forWrite) {
377     if (resp == null) {
378       this.regionManager.onRequestFail(region);
379       throw new TiClientInternalException("ScanResponse failed without a cause");
380     }
381     if (resp.hasRegionError()) {
382       backOffer.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
383       return false;
384     }
385 
386     // Resolve locks
387     // Note: Memory lock conflict is returned by both `ScanResponse.error` &
388     // `ScanResponse.pairs[0].error`, while other key errors are returned by
389     // `ScanResponse.pairs.error`
390     // See https://github.com/pingcap/kvproto/pull/697
391     List<Lock> locks = new ArrayList<>();
392     for (KvPair kvPair : resp.getPairsList()) {
393       if (kvPair.hasError()) {
394         Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError(), codec);
395         locks.add(lock);
396       }
397     }
398     if (!locks.isEmpty()) {
399       ResolveLockResult resolveLockResult =
400           lockResolverClient.resolveLocks(backOffer, version, locks, forWrite);
401       addResolvedLocks(version, resolveLockResult.getResolvedLocks());
402 
403       long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
404       if (msBeforeExpired > 0) {
405         // if not resolve all locks, we wait and retry
406         backOffer.doBackOffWithMaxSleep(
407             BoTxnLockFast, msBeforeExpired, new KeyException(locks.toString()));
408       }
409 
410       return false;
411     }
412 
413     return true;
414   }
415 
416   public List<KvPair> scan(BackOffer backOffer, ByteString startKey, long version) {
417     return scan(backOffer, startKey, version, false);
418   }
419 
420   /**
421    * Prewrite batch keys
422    *
423    * @param backOffer backOffer
424    * @param primary primary lock of keys
425    * @param mutations batch key-values as mutations
426    * @param startTs startTs of prewrite
427    * @param lockTTL lock ttl
428    * @throws TiClientInternalException TiSpark Client exception, unexpected
429    * @throws KeyException Key may be locked
430    * @throws RegionException region error occurs
431    */
432   public void prewrite(
433       BackOffer backOffer, ByteString primary, List<Mutation> mutations, long startTs, long lockTTL)
434       throws TiClientInternalException, KeyException, RegionException {
435     this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false);
436   }
437 
438   /**
439    * Prewrite batch keys
440    *
441    * @param skipConstraintCheck whether to skip constraint check
442    */
443   public void prewrite(
444       BackOffer bo,
445       ByteString primaryLock,
446       List<Mutation> mutations,
447       long startTs,
448       long ttl,
449       boolean skipConstraintCheck)
450       throws TiClientInternalException, KeyException, RegionException {
451     boolean forWrite = true;
452     while (true) {
453       Supplier<PrewriteRequest> factory =
454           () ->
455               getIsV4()
456                   ? PrewriteRequest.newBuilder()
457                       .setContext(makeContext(storeType, bo.getSlowLog()))
458                       .setStartVersion(startTs)
459                       .setPrimaryLock(codec.encodeKey(primaryLock))
460                       .addAllMutations(codec.encodeMutations(mutations))
461                       .setLockTtl(ttl)
462                       .setSkipConstraintCheck(skipConstraintCheck)
463                       .setMinCommitTs(startTs)
464                       .setTxnSize(16)
465                       .build()
466                   : PrewriteRequest.newBuilder()
467                       .setContext(makeContext(storeType, bo.getSlowLog()))
468                       .setStartVersion(startTs)
469                       .setPrimaryLock(primaryLock)
470                       .addAllMutations(mutations)
471                       .setLockTtl(ttl)
472                       .setSkipConstraintCheck(skipConstraintCheck)
473                       // v3 does not support setMinCommitTs(startTs)
474                       .setTxnSize(16)
475                       .build();
476       KVErrorHandler<PrewriteResponse> handler =
477           new KVErrorHandler<>(
478               regionManager,
479               this,
480               lockResolverClient,
481               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
482               resp -> null,
483               resolveLockResult -> null,
484               startTs,
485               forWrite);
486       PrewriteResponse resp = callWithRetry(bo, TikvGrpc.getKvPrewriteMethod(), factory, handler);
487       if (isPrewriteSuccess(bo, resp, startTs)) {
488         return;
489       }
490     }
491   }
492 
493   /**
494    * @param backOffer backOffer
495    * @param resp response
496    * @return Return true means the rpc call success. Return false means the rpc call fail,
497    *     RegionStoreClient should retry. Throw an Exception means the rpc call fail,
498    *     RegionStoreClient cannot handle this kind of error
499    * @throws TiClientInternalException
500    * @throws RegionException
501    * @throws KeyException
502    */
503   private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp, long startTs)
504       throws TiClientInternalException, KeyException, RegionException {
505     boolean forWrite = true;
506     if (resp == null) {
507       this.regionManager.onRequestFail(region);
508       throw new TiClientInternalException("Prewrite Response failed without a cause");
509     }
510     if (resp.hasRegionError()) {
511       throw new RegionException(resp.getRegionError());
512     }
513 
514     boolean isSuccess = true;
515     List<Lock> locks = new ArrayList<>();
516     for (KeyError err : resp.getErrorsList()) {
517       if (err.hasLocked()) {
518         isSuccess = false;
519         Lock lock = new Lock(err.getLocked(), codec);
520         locks.add(lock);
521       } else {
522         throw new KeyException(err.toString());
523       }
524     }
525     if (isSuccess) {
526       return true;
527     }
528 
529     ResolveLockResult resolveLockResult =
530         lockResolverClient.resolveLocks(backOffer, startTs, locks, forWrite);
531     addResolvedLocks(startTs, resolveLockResult.getResolvedLocks());
532     long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
533     if (msBeforeExpired > 0) {
534       backOffer.doBackOffWithMaxSleep(
535           BoTxnLock, msBeforeExpired, new KeyException(resp.getErrorsList().get(0)));
536     }
537     return false;
538   }
539 
540   /** TXN Heart Beat: update primary key ttl */
541   public void txnHeartBeat(BackOffer bo, ByteString primaryLock, long startTs, long ttl) {
542     boolean forWrite = false;
543     while (true) {
544       Supplier<TxnHeartBeatRequest> factory =
545           () ->
546               TxnHeartBeatRequest.newBuilder()
547                   .setContext(makeContext(storeType, bo.getSlowLog()))
548                   .setStartVersion(startTs)
549                   .setPrimaryLock(primaryLock)
550                   .setAdviseLockTtl(ttl)
551                   .build();
552       KVErrorHandler<TxnHeartBeatResponse> handler =
553           new KVErrorHandler<>(
554               regionManager,
555               this,
556               lockResolverClient,
557               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
558               resp -> resp.hasError() ? resp.getError() : null,
559               resolveLockResult -> null,
560               startTs,
561               forWrite);
562       TxnHeartBeatResponse resp =
563           callWithRetry(bo, TikvGrpc.getKvTxnHeartBeatMethod(), factory, handler);
564       if (isTxnHeartBeatSuccess(resp)) {
565         return;
566       }
567     }
568   }
569 
570   private boolean isTxnHeartBeatSuccess(TxnHeartBeatResponse resp)
571       throws TiClientInternalException, RegionException {
572     if (resp == null) {
573       this.regionManager.onRequestFail(region);
574       throw new TiClientInternalException("TxnHeartBeat Response failed without a cause");
575     }
576 
577     if (resp.hasRegionError()) {
578       throw new RegionException(resp.getRegionError());
579     }
580 
581     if (resp.hasError()) {
582       throw new TiClientInternalException("TxnHeartBeat fail, " + resp.getError().getAbort());
583     }
584 
585     return true;
586   }
587 
588   /**
589    * Commit batch keys
590    *
591    * @param backOffer backOffer
592    * @param keys keys to commit
593    * @param startTs start version
594    * @param commitTs commit version
595    */
596   public void commit(BackOffer backOffer, Iterable<ByteString> keys, long startTs, long commitTs)
597       throws KeyException {
598     boolean forWrite = true;
599     Supplier<CommitRequest> factory =
600         () ->
601             CommitRequest.newBuilder()
602                 .setStartVersion(startTs)
603                 .setCommitVersion(commitTs)
604                 .addAllKeys(
605                     StreamSupport.stream(keys.spliterator(), false)
606                         .map(codec::encodeKey)
607                         .collect(Collectors.toList()))
608                 .setContext(makeContext(storeType, backOffer.getSlowLog()))
609                 .build();
610     KVErrorHandler<CommitResponse> handler =
611         new KVErrorHandler<>(
612             regionManager,
613             this,
614             lockResolverClient,
615             resp -> resp.hasRegionError() ? resp.getRegionError() : null,
616             resp -> resp.hasError() ? resp.getError() : null,
617             resolveLockResult -> null,
618             startTs,
619             forWrite);
620     CommitResponse resp = callWithRetry(backOffer, TikvGrpc.getKvCommitMethod(), factory, handler);
621     handleCommitResponse(resp);
622   }
623 
624   /**
625    * @param resp CommitResponse
626    * @throws TiClientInternalException
627    * @throws RegionException
628    * @throws KeyException
629    */
630   private void handleCommitResponse(CommitResponse resp)
631       throws TiClientInternalException, RegionException, KeyException {
632     if (resp == null) {
633       this.regionManager.onRequestFail(region);
634       throw new TiClientInternalException("CommitResponse failed without a cause");
635     }
636     if (resp.hasRegionError()) {
637       // bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
638       // return false;
639       // Caller method should restart commit
640       throw new RegionException(resp.getRegionError());
641     }
642     // If we find locks, we first resolve and let its caller retry.
643     if (resp.hasError()) {
644       throw new KeyException(resp.getError());
645     }
646   }
647 
648   /**
649    * Execute and retrieve the response from TiKV server.
650    *
651    * @param req Select request to process
652    * @param ranges Key range list
653    * @return Remaining tasks of this request, if task split happens, null otherwise
654    */
655   public List<RangeSplitter.RegionTask> coprocess(
656       BackOffer backOffer,
657       DAGRequest req,
658       List<Coprocessor.KeyRange> ranges,
659       Queue<SelectResponse> responseQueue,
660       long startTs) {
661     boolean forWrite = false;
662     if (req == null || ranges == null || req.getExecutorsCount() < 1) {
663       throw new IllegalArgumentException("Invalid coprocessor argument!");
664     }
665 
666     Supplier<Coprocessor.Request> reqToSend =
667         () ->
668             Coprocessor.Request.newBuilder()
669                 .setContext(
670                     makeContext(getResolvedLocks(startTs), this.storeType, backOffer.getSlowLog()))
671                 .setTp(REQ_TYPE_DAG.getValue())
672                 .setStartTs(startTs)
673                 .setData(req.toByteString())
674                 .addAllRanges(ranges)
675                 .build();
676 
677     // we should handle the region error ourselves
678     KVErrorHandler<Coprocessor.Response> handler =
679         new KVErrorHandler<>(
680             regionManager,
681             this,
682             lockResolverClient,
683             resp -> resp.hasRegionError() ? resp.getRegionError() : null,
684             resp -> null,
685             resolveLockResult -> addResolvedLocks(startTs, resolveLockResult.getResolvedLocks()),
686             startTs,
687             forWrite);
688     Coprocessor.Response resp =
689         callWithRetry(backOffer, TikvGrpc.getCoprocessorMethod(), reqToSend, handler);
690     return handleCopResponse(backOffer, resp, ranges, responseQueue, startTs);
691   }
692 
693   // handleCopResponse checks coprocessor Response for region split and lock,
694   // returns more tasks when that happens, or handles the response if no error.
695   // if we're handling streaming coprocessor response, lastRange is the range of last
696   // successful response, otherwise it's nil.
697   private List<RangeSplitter.RegionTask> handleCopResponse(
698       BackOffer backOffer,
699       Coprocessor.Response response,
700       List<Coprocessor.KeyRange> ranges,
701       Queue<SelectResponse> responseQueue,
702       long startTs) {
703     boolean forWrite = false;
704     if (response == null) {
705       // Send request failed, reasons may:
706       // 1. TiKV down
707       // 2. Network partition
708       backOffer.doBackOff(
709           BackOffFunction.BackOffFuncType.BoRegionMiss,
710           new GrpcException("TiKV down or Network partition"));
711       logger.warn("Re-splitting region task due to region error: TiKV down or Network partition");
712       // Split ranges
713       return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, storeType);
714     }
715 
716     if (response.hasRegionError()) {
717       Errorpb.Error regionError = response.getRegionError();
718       backOffer.doBackOff(
719           BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(regionError.toString()));
720       logger.warn("Re-splitting region task due to region error:" + regionError.getMessage());
721       // Split ranges
722       return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, storeType);
723     }
724 
725     if (response.hasLocked()) {
726       Lock lock = new Lock(response.getLocked(), codec);
727       logger.debug(String.format("coprocessor encounters locks: %s", lock));
728       ResolveLockResult resolveLockResult =
729           lockResolverClient.resolveLocks(
730               backOffer, startTs, Collections.singletonList(lock), forWrite);
731       addResolvedLocks(startTs, resolveLockResult.getResolvedLocks());
732       long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
733       if (msBeforeExpired > 0) {
734         backOffer.doBackOffWithMaxSleep(BoTxnLockFast, msBeforeExpired, new LockException(lock));
735       }
736       // Split ranges
737       return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, storeType);
738     }
739 
740     String otherError = response.getOtherError();
741     if (!otherError.isEmpty()) {
742       logger.warn(String.format("Other error occurred, message: %s", otherError));
743       throw new GrpcException(otherError);
744     }
745 
746     responseQueue.offer(doCoprocessor(response));
747     return null;
748   }
749 
750   private Iterator<SelectResponse> doCoprocessor(StreamingResponse response) {
751     Iterator<Coprocessor.Response> responseIterator = response.iterator();
752     // If we got nothing to handle, return null
753     if (!responseIterator.hasNext()) {
754       return null;
755     }
756 
757     // Simply wrap it
758     return new Iterator<SelectResponse>() {
759       @Override
760       public boolean hasNext() {
761         return responseIterator.hasNext();
762       }
763 
764       @Override
765       public SelectResponse next() {
766         return doCoprocessor(responseIterator.next());
767       }
768     };
769   }
770 
771   private SelectResponse doCoprocessor(Coprocessor.Response resp) {
772     try {
773       SelectResponse selectResp = SelectResponse.parseFrom(resp.getData());
774       if (selectResp.hasError()) {
775         throw new SelectException(selectResp.getError(), selectResp.getError().getMsg());
776       }
777       return selectResp;
778     } catch (InvalidProtocolBufferException e) {
779       throw new TiClientInternalException("Error parsing protobuf for coprocessor response.", e);
780     }
781   }
782 
783   // TODO: wait for future fix
784   // coprocessStreaming doesn't handle split error
785   // future work should handle it and do the resolve
786   // locks correspondingly
787   public Iterator<SelectResponse> coprocessStreaming(
788       DAGRequest req, List<Coprocessor.KeyRange> ranges, long startTs) {
789     boolean forWrite = false;
790     Supplier<Coprocessor.Request> reqToSend =
791         () ->
792             Coprocessor.Request.newBuilder()
793                 .setContext(
794                     makeContext(
795                         getResolvedLocks(startTs), this.storeType, SlowLogEmptyImpl.INSTANCE))
796                 // TODO: If no executors...?
797                 .setTp(REQ_TYPE_DAG.getValue())
798                 .setData(req.toByteString())
799                 .addAllRanges(ranges)
800                 .build();
801 
802     KVErrorHandler<StreamingResponse> handler =
803         new KVErrorHandler<>(
804             regionManager,
805             this,
806             lockResolverClient,
807             StreamingResponse::getFirstRegionError, // TODO: handle all errors in streaming response
808             resp -> null,
809             resolveLockResult -> addResolvedLocks(startTs, resolveLockResult.getResolvedLocks()),
810             startTs,
811             forWrite);
812 
813     StreamingResponse responseIterator =
814         this.callServerStreamingWithRetry(
815             ConcreteBackOffer.newCopNextMaxBackOff(pdClient.getClusterId()),
816             TikvGrpc.getCoprocessorStreamMethod(),
817             reqToSend,
818             handler);
819     return doCoprocessor(responseIterator);
820   }
821 
822   /**
823    * Send SplitRegion request to tikv split a region at splitKey. splitKey must between current
824    * region's start key and end key.
825    *
826    * @param splitKeys is the split points for a specific region.
827    * @return a split region info.
828    */
829   public List<Metapb.Region> splitRegion(List<ByteString> splitKeys) {
830     Supplier<SplitRegionRequest> request =
831         () ->
832             SplitRegionRequest.newBuilder()
833                 .setContext(makeContext(storeType, SlowLogEmptyImpl.INSTANCE))
834                 .addAllSplitKeys(codec.encodeKeys(splitKeys))
835                 .setIsRawKv(conf.isRawKVMode())
836                 .build();
837 
838     KVErrorHandler<SplitRegionResponse> handler =
839         new KVErrorHandler<>(
840             regionManager,
841             this,
842             null,
843             resp -> resp.hasRegionError() ? resp.getRegionError() : null,
844             resp -> null,
845             resolveLockResult -> null,
846             0L,
847             false);
848 
849     SplitRegionResponse resp =
850         callWithRetry(
851             ConcreteBackOffer.newGetBackOff(pdClient.getClusterId()),
852             TikvGrpc.getSplitRegionMethod(),
853             request,
854             handler);
855 
856     if (resp == null) {
857       this.regionManager.onRequestFail(region);
858       throw new TiClientInternalException("SplitRegion Response failed without a cause");
859     }
860 
861     if (resp.hasRegionError()) {
862       throw new TiClientInternalException(
863           String.format(
864               "failed to split region %d because %s", region.getId(), resp.getRegionError()));
865     }
866 
867     if (conf.getApiVersion().isV1()) {
868       return resp.getRegionsList();
869     }
870     return resp.getRegionsList().stream().map(codec::decodeRegion).collect(Collectors.toList());
871   }
872 
873   // APIs for Raw Scan/Put/Get/Delete
874 
875   public Optional<ByteString> rawGet(BackOffer backOffer, ByteString key) {
876     Long clusterId = pdClient.getClusterId();
877     Histogram.Timer requestTimer =
878         GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get", clusterId.toString()).startTimer();
879     try {
880       Supplier<RawGetRequest> factory =
881           () ->
882               RawGetRequest.newBuilder()
883                   .setContext(makeContext(storeType, backOffer.getSlowLog()))
884                   .setKey(codec.encodeKey(key))
885                   .build();
886       RegionErrorHandler<RawGetResponse> handler =
887           new RegionErrorHandler<RawGetResponse>(
888               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
889       RawGetResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetMethod(), factory, handler);
890       return rawGetHelper(resp);
891     } finally {
892       requestTimer.observeDuration();
893     }
894   }
895 
896   private Optional<ByteString> rawGetHelper(RawGetResponse resp) {
897     if (resp == null) {
898       this.regionManager.onRequestFail(region);
899       throw new TiClientInternalException("RawGetResponse failed without a cause");
900     }
901     String error = resp.getError();
902     if (!error.isEmpty()) {
903       throw new KeyException(resp.getError());
904     }
905     if (resp.hasRegionError()) {
906       throw new RegionException(resp.getRegionError());
907     }
908     if (resp.getNotFound()) {
909       return Optional.empty();
910     } else {
911       return Optional.of(resp.getValue());
912     }
913   }
914 
915   public Optional<Long> rawGetKeyTTL(BackOffer backOffer, ByteString key) {
916     Long clusterId = pdClient.getClusterId();
917     Histogram.Timer requestTimer =
918         GRPC_RAW_REQUEST_LATENCY
919             .labels("client_grpc_raw_get_key_ttl", clusterId.toString())
920             .startTimer();
921     try {
922       Supplier<RawGetKeyTTLRequest> factory =
923           () ->
924               RawGetKeyTTLRequest.newBuilder()
925                   .setContext(makeContext(storeType, backOffer.getSlowLog()))
926                   .setKey(codec.encodeKey(key))
927                   .build();
928       RegionErrorHandler<RawGetKeyTTLResponse> handler =
929           new RegionErrorHandler<RawGetKeyTTLResponse>(
930               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
931       RawGetKeyTTLResponse resp =
932           callWithRetry(backOffer, TikvGrpc.getRawGetKeyTTLMethod(), factory, handler);
933       return rawGetKeyTTLHelper(resp);
934     } finally {
935       requestTimer.observeDuration();
936     }
937   }
938 
939   private Optional<Long> rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
940     if (resp == null) {
941       this.regionManager.onRequestFail(region);
942       throw new TiClientInternalException("RawGetResponse failed without a cause");
943     }
944     String error = resp.getError();
945     if (!error.isEmpty()) {
946       throw new KeyException(resp.getError());
947     }
948     if (resp.hasRegionError()) {
949       throw new RegionException(resp.getRegionError());
950     }
951     if (resp.getNotFound()) {
952       return Optional.empty();
953     }
954     return Optional.of(resp.getTtl());
955   }
956 
957   public void rawDelete(BackOffer backOffer, ByteString key, boolean atomicForCAS) {
958     Long clusterId = pdClient.getClusterId();
959     Histogram.Timer requestTimer =
960         GRPC_RAW_REQUEST_LATENCY
961             .labels("client_grpc_raw_delete", clusterId.toString())
962             .startTimer();
963     try {
964       Supplier<RawDeleteRequest> factory =
965           () ->
966               RawDeleteRequest.newBuilder()
967                   .setContext(makeContext(storeType, backOffer.getSlowLog()))
968                   .setKey(codec.encodeKey(key))
969                   .setForCas(atomicForCAS)
970                   .build();
971 
972       RegionErrorHandler<RawDeleteResponse> handler =
973           new RegionErrorHandler<RawDeleteResponse>(
974               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
975       RawDeleteResponse resp =
976           callWithRetry(backOffer, TikvGrpc.getRawDeleteMethod(), factory, handler);
977       rawDeleteHelper(resp, region);
978     } finally {
979       requestTimer.observeDuration();
980     }
981   }
982 
983   private void rawDeleteHelper(RawDeleteResponse resp, TiRegion region) {
984     if (resp == null) {
985       this.regionManager.onRequestFail(region);
986       throw new TiClientInternalException("RawDeleteResponse failed without a cause");
987     }
988     String error = resp.getError();
989     if (!error.isEmpty()) {
990       throw new KeyException(resp.getError());
991     }
992     if (resp.hasRegionError()) {
993       throw new RegionException(resp.getRegionError());
994     }
995   }
996 
997   public void rawPut(
998       BackOffer backOffer, ByteString key, ByteString value, long ttl, boolean atomicForCAS) {
999     Long clusterId = pdClient.getClusterId();
1000     Histogram.Timer requestTimer =
1001         GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put", clusterId.toString()).startTimer();
1002     try {
1003       Supplier<RawPutRequest> factory =
1004           () ->
1005               RawPutRequest.newBuilder()
1006                   .setContext(makeContext(storeType, backOffer.getSlowLog()))
1007                   .setKey(codec.encodeKey(key))
1008                   .setValue(value)
1009                   .setTtl(ttl)
1010                   .setForCas(atomicForCAS)
1011                   .build();
1012 
1013       RegionErrorHandler<RawPutResponse> handler =
1014           new RegionErrorHandler<RawPutResponse>(
1015               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1016       RawPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawPutMethod(), factory, handler);
1017       rawPutHelper(resp);
1018     } finally {
1019       requestTimer.observeDuration();
1020     }
1021   }
1022 
1023   private void rawPutHelper(RawPutResponse resp) {
1024     if (resp == null) {
1025       this.regionManager.onRequestFail(region);
1026       throw new TiClientInternalException("RawPutResponse failed without a cause");
1027     }
1028     String error = resp.getError();
1029     if (!error.isEmpty()) {
1030       throw new KeyException(resp.getError());
1031     }
1032     if (resp.hasRegionError()) {
1033       throw new RegionException(resp.getRegionError());
1034     }
1035   }
1036 
1037   public void rawCompareAndSet(
1038       BackOffer backOffer,
1039       ByteString key,
1040       Optional<ByteString> prevValue,
1041       ByteString value,
1042       long ttl)
1043       throws RawCASConflictException {
1044     Long clusterId = pdClient.getClusterId();
1045     Histogram.Timer requestTimer =
1046         GRPC_RAW_REQUEST_LATENCY
1047             .labels("client_grpc_raw_put_if_absent", clusterId.toString())
1048             .startTimer();
1049     try {
1050       Supplier<RawCASRequest> factory =
1051           () ->
1052               RawCASRequest.newBuilder()
1053                   .setContext(makeContext(storeType, backOffer.getSlowLog()))
1054                   .setKey(codec.encodeKey(key))
1055                   .setValue(value)
1056                   .setPreviousValue(prevValue.orElse(ByteString.EMPTY))
1057                   .setPreviousNotExist(!prevValue.isPresent())
1058                   .setTtl(ttl)
1059                   .build();
1060 
1061       RegionErrorHandler<RawCASResponse> handler =
1062           new RegionErrorHandler<RawCASResponse>(
1063               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1064       RawCASResponse resp =
1065           callWithRetry(backOffer, TikvGrpc.getRawCompareAndSwapMethod(), factory, handler);
1066       rawCompareAndSetHelper(key, prevValue, resp);
1067     } finally {
1068       requestTimer.observeDuration();
1069     }
1070   }
1071 
1072   private void rawCompareAndSetHelper(
1073       ByteString key, Optional<ByteString> expectedPrevValue, RawCASResponse resp)
1074       throws RawCASConflictException {
1075     if (resp == null) {
1076       this.regionManager.onRequestFail(region);
1077       throw new TiClientInternalException("RawCASResponse failed without a cause");
1078     }
1079     String error = resp.getError();
1080     if (!error.isEmpty()) {
1081       throw new KeyException(resp.getError());
1082     }
1083     if (resp.hasRegionError()) {
1084       throw new RegionException(resp.getRegionError());
1085     }
1086     if (!resp.getSucceed()) {
1087       if (resp.getPreviousNotExist()) {
1088         throw new RawCASConflictException(key, expectedPrevValue, Optional.empty());
1089       } else {
1090         throw new RawCASConflictException(
1091             key, expectedPrevValue, Optional.of(resp.getPreviousValue()));
1092       }
1093     }
1094   }
1095 
1096   public List<KvPair> rawBatchGet(BackOffer backoffer, List<ByteString> keys) {
1097     Long clusterId = pdClient.getClusterId();
1098     Histogram.Timer requestTimer =
1099         GRPC_RAW_REQUEST_LATENCY
1100             .labels("client_grpc_raw_batch_get", clusterId.toString())
1101             .startTimer();
1102     try {
1103       if (keys.isEmpty()) {
1104         return new ArrayList<>();
1105       }
1106       Supplier<RawBatchGetRequest> factory =
1107           () ->
1108               RawBatchGetRequest.newBuilder()
1109                   .setContext(makeContext(storeType, backoffer.getSlowLog()))
1110                   .addAllKeys(codec.encodeKeys(keys))
1111                   .build();
1112       RegionErrorHandler<RawBatchGetResponse> handler =
1113           new RegionErrorHandler<RawBatchGetResponse>(
1114               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1115       RawBatchGetResponse resp =
1116           callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler);
1117       return handleRawBatchGet(resp);
1118     } finally {
1119       requestTimer.observeDuration();
1120     }
1121   }
1122 
1123   private List<KvPair> handleRawBatchGet(RawBatchGetResponse resp) {
1124     if (resp == null) {
1125       this.regionManager.onRequestFail(region);
1126       throw new TiClientInternalException("RawBatchPutResponse failed without a cause");
1127     }
1128     if (resp.hasRegionError()) {
1129       throw new RegionException(resp.getRegionError());
1130     }
1131 
1132     return codec.decodeKvPairs(resp.getPairsList());
1133   }
1134 
1135   public void rawBatchPut(
1136       BackOffer backOffer, List<KvPair> kvPairs, long ttl, boolean atomicForCAS) {
1137     Long clusterId = pdClient.getClusterId();
1138     Histogram.Timer requestTimer =
1139         GRPC_RAW_REQUEST_LATENCY
1140             .labels("client_grpc_raw_batch_put", clusterId.toString())
1141             .startTimer();
1142     try {
1143       if (kvPairs.isEmpty()) {
1144         return;
1145       }
1146       Supplier<RawBatchPutRequest> factory =
1147           () ->
1148               RawBatchPutRequest.newBuilder()
1149                   .setContext(makeContext(storeType, backOffer.getSlowLog()))
1150                   .addAllPairs(kvPairs)
1151                   .setTtl(ttl)
1152                   .addTtls(ttl)
1153                   .setForCas(atomicForCAS)
1154                   .build();
1155       RegionErrorHandler<RawBatchPutResponse> handler =
1156           new RegionErrorHandler<RawBatchPutResponse>(
1157               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1158       RawBatchPutResponse resp =
1159           callWithRetry(backOffer, TikvGrpc.getRawBatchPutMethod(), factory, handler);
1160       handleRawBatchPut(resp);
1161     } finally {
1162       requestTimer.observeDuration();
1163     }
1164   }
1165 
1166   public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atomicForCAS) {
1167     List<KvPair> pairs = new ArrayList<>();
1168     for (int i = 0; i < batch.getKeys().size(); i++) {
1169       pairs.add(
1170           KvPair.newBuilder()
1171               .setKey(codec.encodeKey(batch.getKeys().get(i)))
1172               .setValue(batch.getValues().get(i))
1173               .build());
1174     }
1175     rawBatchPut(backOffer, pairs, ttl, atomicForCAS);
1176   }
1177 
1178   private void handleRawBatchPut(RawBatchPutResponse resp) {
1179     if (resp == null) {
1180       this.regionManager.onRequestFail(region);
1181       throw new TiClientInternalException("RawBatchPutResponse failed without a cause");
1182     }
1183     String error = resp.getError();
1184     if (!error.isEmpty()) {
1185       throw new KeyException(resp.getError());
1186     }
1187     if (resp.hasRegionError()) {
1188       throw new RegionException(resp.getRegionError());
1189     }
1190   }
1191 
1192   public void rawBatchDelete(BackOffer backoffer, List<ByteString> keys, boolean atomicForCAS) {
1193     Long clusterId = pdClient.getClusterId();
1194     Histogram.Timer requestTimer =
1195         GRPC_RAW_REQUEST_LATENCY
1196             .labels("client_grpc_raw_batch_delete", clusterId.toString())
1197             .startTimer();
1198     try {
1199       if (keys.isEmpty()) {
1200         return;
1201       }
1202       Supplier<RawBatchDeleteRequest> factory =
1203           () ->
1204               RawBatchDeleteRequest.newBuilder()
1205                   .setContext(makeContext(storeType, backoffer.getSlowLog()))
1206                   .addAllKeys(codec.encodeKeys(keys))
1207                   .setForCas(atomicForCAS)
1208                   .build();
1209       RegionErrorHandler<RawBatchDeleteResponse> handler =
1210           new RegionErrorHandler<RawBatchDeleteResponse>(
1211               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1212       RawBatchDeleteResponse resp =
1213           callWithRetry(backoffer, TikvGrpc.getRawBatchDeleteMethod(), factory, handler);
1214       handleRawBatchDelete(resp);
1215     } finally {
1216       requestTimer.observeDuration();
1217     }
1218   }
1219 
1220   private void handleRawBatchDelete(RawBatchDeleteResponse resp) {
1221     if (resp == null) {
1222       this.regionManager.onRequestFail(region);
1223       throw new TiClientInternalException("RawBatchDeleteResponse failed without a cause");
1224     }
1225     String error = resp.getError();
1226     if (!error.isEmpty()) {
1227       throw new KeyException(resp.getError());
1228     }
1229     if (resp.hasRegionError()) {
1230       throw new RegionException(resp.getRegionError());
1231     }
1232   }
1233 
1234   /**
1235    * Return a batch KvPair list containing limited key-value pairs starting from `key`, which are in
1236    * the same region
1237    *
1238    * @param backOffer BackOffer
1239    * @param key startKey
1240    * @param keyOnly true if value of KvPair is not needed
1241    * @return KvPair list
1242    */
1243   public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
1244     Long clusterId = pdClient.getClusterId();
1245     Histogram.Timer requestTimer =
1246         GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer();
1247     try {
1248       Supplier<RawScanRequest> factory =
1249           () -> {
1250             Pair<ByteString, ByteString> range = codec.encodeRange(key, ByteString.EMPTY);
1251             return RawScanRequest.newBuilder()
1252                 .setContext(makeContext(storeType, backOffer.getSlowLog()))
1253                 .setStartKey(range.first)
1254                 .setEndKey(range.second)
1255                 .setKeyOnly(keyOnly)
1256                 .setLimit(limit)
1257                 .build();
1258           };
1259 
1260       RegionErrorHandler<RawScanResponse> handler =
1261           new RegionErrorHandler<RawScanResponse>(
1262               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1263       RawScanResponse resp =
1264           callWithRetry(backOffer, TikvGrpc.getRawScanMethod(), factory, handler);
1265       // RegionErrorHandler may refresh region cache due to outdated region info,
1266       // This region need to get newest info from cache.
1267       region = regionManager.getRegionByKey(key, backOffer);
1268       return rawScanHelper(resp);
1269     } finally {
1270       requestTimer.observeDuration();
1271     }
1272   }
1273 
1274   public List<KvPair> rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) {
1275     return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly);
1276   }
1277 
1278   private List<KvPair> rawScanHelper(RawScanResponse resp) {
1279     if (resp == null) {
1280       this.regionManager.onRequestFail(region);
1281       throw new TiClientInternalException("RawScanResponse failed without a cause");
1282     }
1283     if (resp.hasRegionError()) {
1284       throw new RegionException(resp.getRegionError());
1285     }
1286     return codec.decodeKvPairs(resp.getKvsList());
1287   }
1288 
1289   /**
1290    * Delete raw keys in the range of [startKey, endKey)
1291    *
1292    * @param backOffer BackOffer
1293    * @param startKey startKey
1294    * @param endKey endKey
1295    */
1296   public void rawDeleteRange(BackOffer backOffer, ByteString startKey, ByteString endKey) {
1297     Long clusterId = pdClient.getClusterId();
1298     Histogram.Timer requestTimer =
1299         GRPC_RAW_REQUEST_LATENCY
1300             .labels("client_grpc_raw_delete_range", clusterId.toString())
1301             .startTimer();
1302     try {
1303       Supplier<RawDeleteRangeRequest> factory =
1304           () -> {
1305             Pair<ByteString, ByteString> range = codec.encodeRange(startKey, endKey);
1306             return RawDeleteRangeRequest.newBuilder()
1307                 .setContext(makeContext(storeType, backOffer.getSlowLog()))
1308                 .setStartKey(range.first)
1309                 .setEndKey(range.second)
1310                 .build();
1311           };
1312 
1313       RegionErrorHandler<RawDeleteRangeResponse> handler =
1314           new RegionErrorHandler<RawDeleteRangeResponse>(
1315               regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
1316       RawDeleteRangeResponse resp =
1317           callWithRetry(backOffer, TikvGrpc.getRawDeleteRangeMethod(), factory, handler);
1318       rawDeleteRangeHelper(resp);
1319     } finally {
1320       requestTimer.observeDuration();
1321     }
1322   }
1323 
1324   private void rawDeleteRangeHelper(RawDeleteRangeResponse resp) {
1325     if (resp == null) {
1326       this.regionManager.onRequestFail(region);
1327       throw new TiClientInternalException("RawDeleteRangeResponse failed without a cause");
1328     }
1329     String error = resp.getError();
1330     if (!error.isEmpty()) {
1331       throw new KeyException(resp.getError());
1332     }
1333     if (resp.hasRegionError()) {
1334       throw new RegionException(resp.getRegionError());
1335     }
1336   }
1337 
1338   public enum RequestTypes {
1339     REQ_TYPE_SELECT(101),
1340     REQ_TYPE_INDEX(102),
1341     REQ_TYPE_DAG(103),
1342     REQ_TYPE_ANALYZE(104),
1343     BATCH_ROW_COUNT(64);
1344 
1345     private final int value;
1346 
1347     RequestTypes(int value) {
1348       this.value = value;
1349     }
1350 
1351     public int getValue() {
1352       return value;
1353     }
1354   }
1355 
1356   public static class RegionStoreClientBuilder {
1357 
1358     private final TiConfiguration conf;
1359     private final ChannelFactory channelFactory;
1360     private final RegionManager regionManager;
1361     private final PDClient pdClient;
1362 
1363     public RegionStoreClientBuilder(
1364         TiConfiguration conf,
1365         ChannelFactory channelFactory,
1366         RegionManager regionManager,
1367         PDClient pdClient) {
1368       Objects.requireNonNull(conf, "conf is null");
1369       Objects.requireNonNull(channelFactory, "channelFactory is null");
1370       Objects.requireNonNull(regionManager, "regionManager is null");
1371       this.conf = conf;
1372       this.channelFactory = channelFactory;
1373       this.regionManager = regionManager;
1374       this.pdClient = pdClient;
1375     }
1376 
1377     public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType storeType)
1378         throws GrpcException {
1379       Objects.requireNonNull(region, "region is null");
1380       Objects.requireNonNull(store, "store is null");
1381       Objects.requireNonNull(storeType, "storeType is null");
1382 
1383       String addressStr = store.getStore().getAddress();
1384       if (logger.isDebugEnabled()) {
1385         logger.debug(String.format("Create region store client on address %s", addressStr));
1386       }
1387       ManagedChannel channel = null;
1388 
1389       TikvBlockingStub blockingStub = null;
1390       TikvFutureStub asyncStub = null;
1391 
1392       if (conf.getEnableGrpcForward() && store.getProxyStore() != null && !store.isReachable()) {
1393         addressStr = store.getProxyStore().getAddress();
1394         channel =
1395             channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
1396         Metadata header = new Metadata();
1397         header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
1398         blockingStub =
1399             TikvGrpc.newBlockingStub(channel)
1400                 .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
1401         asyncStub =
1402             TikvGrpc.newFutureStub(channel)
1403                 .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
1404       } else {
1405         channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
1406         blockingStub = TikvGrpc.newBlockingStub(channel);
1407         asyncStub = TikvGrpc.newFutureStub(channel);
1408       }
1409 
1410       return new RegionStoreClient(
1411           conf,
1412           region,
1413           store,
1414           storeType,
1415           channelFactory,
1416           blockingStub,
1417           asyncStub,
1418           regionManager,
1419           pdClient,
1420           this);
1421     }
1422 
1423     public RegionStoreClient build(TiRegion region, TiStore store) throws GrpcException {
1424       return build(region, store, TiStoreType.TiKV);
1425     }
1426 
1427     public RegionStoreClient build(ByteString key) throws GrpcException {
1428       return build(key, TiStoreType.TiKV);
1429     }
1430 
1431     public RegionStoreClient build(ByteString key, BackOffer backOffer) throws GrpcException {
1432       return build(key, TiStoreType.TiKV, backOffer);
1433     }
1434 
1435     public RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException {
1436       return build(key, storeType, defaultBackOff());
1437     }
1438 
1439     public RegionStoreClient build(ByteString key, TiStoreType storeType, BackOffer backOffer)
1440         throws GrpcException {
1441       Pair<TiRegion, TiStore> pair =
1442           regionManager.getRegionStorePairByKey(key, storeType, backOffer);
1443       return build(pair.first, pair.second, storeType);
1444     }
1445 
1446     public RegionStoreClient build(TiRegion region) throws GrpcException {
1447       return build(region, defaultBackOff());
1448     }
1449 
1450     public RegionStoreClient build(TiRegion region, BackOffer backOffer) throws GrpcException {
1451       TiStore store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
1452       return build(region, store, TiStoreType.TiKV);
1453     }
1454 
1455     public RegionManager getRegionManager() {
1456       return regionManager;
1457     }
1458 
1459     private BackOffer defaultBackOff() {
1460       BackOffer backoffer =
1461           ConcreteBackOffer.newCustomBackOff(
1462               conf.getRawKVDefaultBackoffInMS(), pdClient.getClusterId());
1463       return backoffer;
1464     }
1465   }
1466 }