View Javadoc
1   /*
2    * Copyright 2018 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.raw;
19  
20  import static org.tikv.common.util.ClientUtils.appendBatches;
21  import static org.tikv.common.util.ClientUtils.genUUID;
22  import static org.tikv.common.util.ClientUtils.getBatches;
23  import static org.tikv.common.util.ClientUtils.getTasks;
24  import static org.tikv.common.util.ClientUtils.getTasksWithOutput;
25  import static org.tikv.common.util.ClientUtils.groupKeysByRegion;
26  
27  import com.google.protobuf.ByteString;
28  import io.prometheus.client.Counter;
29  import io.prometheus.client.Histogram;
30  import java.net.URI;
31  import java.util.ArrayList;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Objects;
38  import java.util.Optional;
39  import java.util.Queue;
40  import java.util.concurrent.ExecutionException;
41  import java.util.concurrent.ExecutorCompletionService;
42  import java.util.concurrent.ExecutorService;
43  import java.util.concurrent.Future;
44  import java.util.concurrent.TimeUnit;
45  import java.util.stream.Collectors;
46  import org.slf4j.Logger;
47  import org.slf4j.LoggerFactory;
48  import org.tikv.common.TiConfiguration;
49  import org.tikv.common.TiSession;
50  import org.tikv.common.codec.KeyUtils;
51  import org.tikv.common.exception.GrpcException;
52  import org.tikv.common.exception.RawCASConflictException;
53  import org.tikv.common.exception.TiKVException;
54  import org.tikv.common.importer.ImporterClient;
55  import org.tikv.common.importer.SwitchTiKVModeClient;
56  import org.tikv.common.key.Key;
57  import org.tikv.common.log.SlowLog;
58  import org.tikv.common.log.SlowLogEmptyImpl;
59  import org.tikv.common.log.SlowLogImpl;
60  import org.tikv.common.log.SlowLogSpan;
61  import org.tikv.common.operation.iterator.RawScanIterator;
62  import org.tikv.common.region.RegionStoreClient;
63  import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
64  import org.tikv.common.region.TiRegion;
65  import org.tikv.common.util.BackOffFunction;
66  import org.tikv.common.util.BackOffer;
67  import org.tikv.common.util.Batch;
68  import org.tikv.common.util.ConcreteBackOffer;
69  import org.tikv.common.util.DeleteRange;
70  import org.tikv.common.util.HistogramUtils;
71  import org.tikv.common.util.Pair;
72  import org.tikv.common.util.ScanOption;
73  import org.tikv.kvproto.Kvrpcpb.KvPair;
74  
75  public class RawKVClient implements RawKVClientBase {
76    private final Long clusterId;
77    private final List<URI> pdAddresses;
78    private final TiSession tiSession;
79    private final RegionStoreClientBuilder clientBuilder;
80    private final TiConfiguration conf;
81    private final boolean atomicForCAS;
82    private final ExecutorService batchGetThreadPool;
83    private final ExecutorService batchPutThreadPool;
84    private final ExecutorService batchDeleteThreadPool;
85    private final ExecutorService batchScanThreadPool;
86    private final ExecutorService deleteRangeThreadPool;
87    private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class);
88  
89    public static final Histogram RAW_REQUEST_LATENCY =
90        HistogramUtils.buildDuration()
91            .name("client_java_raw_requests_latency")
92            .help("client raw request latency.")
93            .labelNames("type", "cluster")
94            .register();
95  
96    public static final Counter RAW_REQUEST_SUCCESS =
97        Counter.build()
98            .name("client_java_raw_requests_success")
99            .help("client raw request success.")
100           .labelNames("type", "cluster")
101           .register();
102 
103   public static final Counter RAW_REQUEST_FAILURE =
104       Counter.build()
105           .name("client_java_raw_requests_failure")
106           .help("client raw request failure.")
107           .labelNames("type", "cluster")
108           .register();
109 
110   private static final TiKVException ERR_MAX_SCAN_LIMIT_EXCEEDED =
111       new TiKVException("limit should be less than MAX_RAW_SCAN_LIMIT");
112 
113   public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
114     Objects.requireNonNull(session, "session is null");
115     Objects.requireNonNull(clientBuilder, "clientBuilder is null");
116     this.conf = session.getConf();
117     this.tiSession = session;
118     this.clientBuilder = clientBuilder;
119     this.batchGetThreadPool = session.getThreadPoolForBatchGet();
120     this.batchPutThreadPool = session.getThreadPoolForBatchPut();
121     this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete();
122     this.batchScanThreadPool = session.getThreadPoolForBatchScan();
123     this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange();
124     this.atomicForCAS = conf.isEnableAtomicForCAS();
125     this.clusterId = session.getPDClient().getClusterId();
126     this.pdAddresses = session.getPDClient().getPdAddrs();
127   }
128 
129   private SlowLog withClusterInfo(SlowLog logger) {
130     return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses);
131   }
132 
133   private String[] withClusterId(String label) {
134     return new String[] {label, clusterId.toString()};
135   }
136 
137   @Override
138   public void close() {}
139 
140   @Override
141   public void put(ByteString key, ByteString value) {
142     put(key, value, 0);
143   }
144 
145   @Override
146   public void put(ByteString key, ByteString value, long ttl) {
147     String[] labels = withClusterId("client_raw_put");
148     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
149 
150     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
151     SlowLogSpan span = slowLog.start("put");
152     span.addProperty("key", KeyUtils.formatBytesUTF8(key));
153 
154     ConcreteBackOffer backOffer =
155         ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId);
156     try {
157       while (true) {
158         try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
159           span.addProperty("region", client.getRegion().toString());
160           client.rawPut(backOffer, key, value, ttl, atomicForCAS);
161           RAW_REQUEST_SUCCESS.labels(labels).inc();
162           return;
163         } catch (final TiKVException e) {
164           backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
165           logger.warn("Retry for put error", e);
166         }
167       }
168     } catch (Exception e) {
169       RAW_REQUEST_FAILURE.labels(labels).inc();
170       slowLog.setError(e);
171       throw e;
172     } finally {
173       requestTimer.observeDuration();
174       span.end();
175       slowLog.log();
176     }
177   }
178 
179   @Override
180   public Optional<ByteString> putIfAbsent(ByteString key, ByteString value) {
181     return putIfAbsent(key, value, 0L);
182   }
183 
184   @Override
185   public Optional<ByteString> putIfAbsent(ByteString key, ByteString value, long ttl) {
186     try {
187       compareAndSet(key, Optional.empty(), value, ttl);
188       return Optional.empty();
189     } catch (RawCASConflictException e) {
190       return e.getPrevValue();
191     }
192   }
193 
194   @Override
195   public void compareAndSet(ByteString key, Optional<ByteString> prevValue, ByteString value)
196       throws RawCASConflictException {
197     compareAndSet(key, prevValue, value, 0L);
198   }
199 
200   @Override
201   public void compareAndSet(
202       ByteString key, Optional<ByteString> prevValue, ByteString value, long ttl)
203       throws RawCASConflictException {
204     if (!atomicForCAS) {
205       throw new IllegalArgumentException(
206           "To use compareAndSet or putIfAbsent, please enable the config tikv.enable_atomic_for_cas.");
207     }
208 
209     String[] labels = withClusterId("client_raw_compare_and_set");
210     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
211 
212     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
213     SlowLogSpan span = slowLog.start("putIfAbsent");
214     span.addProperty("key", KeyUtils.formatBytesUTF8(key));
215 
216     ConcreteBackOffer backOffer =
217         ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId);
218     try {
219       while (true) {
220         try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
221           span.addProperty("region", client.getRegion().toString());
222           client.rawCompareAndSet(backOffer, key, prevValue, value, ttl);
223           RAW_REQUEST_SUCCESS.labels(labels).inc();
224           return;
225         } catch (final TiKVException e) {
226           backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
227           logger.warn("Retry for putIfAbsent error", e);
228         }
229       }
230     } catch (Exception e) {
231       RAW_REQUEST_FAILURE.labels(labels).inc();
232       slowLog.setError(e);
233       throw e;
234     } finally {
235       requestTimer.observeDuration();
236       span.end();
237       slowLog.log();
238     }
239   }
240 
241   @Override
242   public void batchPut(Map<ByteString, ByteString> kvPairs) {
243     batchPut(kvPairs, 0);
244   }
245 
246   @Override
247   public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
248     String[] labels = withClusterId("client_raw_batch_put");
249     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
250 
251     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
252     SlowLogSpan span = slowLog.start("batchPut");
253     span.addProperty("keySize", String.valueOf(kvPairs.size()));
254 
255     ConcreteBackOffer backOffer =
256         ConcreteBackOffer.newDeadlineBackOff(
257             conf.getRawKVBatchWriteTimeoutInMS(), slowLog, clusterId);
258     try {
259       long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
260       doSendBatchPut(backOffer, kvPairs, ttl, deadline);
261       RAW_REQUEST_SUCCESS.labels(labels).inc();
262     } catch (Exception e) {
263       RAW_REQUEST_FAILURE.labels(labels).inc();
264       slowLog.setError(e);
265       throw e;
266     } finally {
267       requestTimer.observeDuration();
268       span.end();
269       slowLog.log();
270     }
271   }
272 
273   @Override
274   public Optional<ByteString> get(ByteString key) {
275     String[] labels = withClusterId("client_raw_get");
276     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
277 
278     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
279     SlowLogSpan span = slowLog.start("get");
280     span.addProperty("key", KeyUtils.formatBytesUTF8(key));
281 
282     ConcreteBackOffer backOffer =
283         ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog, clusterId);
284     try {
285       while (true) {
286         try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
287           span.addProperty("region", client.getRegion().toString());
288           Optional<ByteString> result = client.rawGet(backOffer, key);
289           RAW_REQUEST_SUCCESS.labels(labels).inc();
290           return result;
291         } catch (final TiKVException e) {
292           backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
293           logger.warn("Retry for get error", e);
294         }
295       }
296     } catch (Exception e) {
297       RAW_REQUEST_FAILURE.labels(labels).inc();
298       slowLog.setError(e);
299       throw e;
300     } finally {
301       requestTimer.observeDuration();
302       span.end();
303       slowLog.log();
304     }
305   }
306 
307   @Override
308   public List<KvPair> batchGet(List<ByteString> keys) {
309     String[] labels = withClusterId("client_raw_batch_get");
310     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
311     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS()));
312     SlowLogSpan span = slowLog.start("batchGet");
313     span.addProperty("keySize", String.valueOf(keys.size()));
314     ConcreteBackOffer backOffer =
315         ConcreteBackOffer.newDeadlineBackOff(
316             conf.getRawKVBatchReadTimeoutInMS(), slowLog, clusterId);
317     try {
318       long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS();
319       List<KvPair> result = doSendBatchGet(backOffer, keys, deadline);
320       RAW_REQUEST_SUCCESS.labels(labels).inc();
321       return result;
322     } catch (Exception e) {
323       RAW_REQUEST_FAILURE.labels(labels).inc();
324       slowLog.setError(e);
325       throw e;
326     } finally {
327       requestTimer.observeDuration();
328       span.end();
329       slowLog.log();
330     }
331   }
332 
333   @Override
334   public void batchDelete(List<ByteString> keys) {
335     String[] labels = withClusterId("client_raw_batch_delete");
336     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
337     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
338     SlowLogSpan span = slowLog.start("batchDelete");
339     span.addProperty("keySize", String.valueOf(keys.size()));
340     ConcreteBackOffer backOffer =
341         ConcreteBackOffer.newDeadlineBackOff(
342             conf.getRawKVBatchWriteTimeoutInMS(), slowLog, clusterId);
343     try {
344       long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
345       doSendBatchDelete(backOffer, keys, deadline);
346       RAW_REQUEST_SUCCESS.labels(labels).inc();
347     } catch (Exception e) {
348       RAW_REQUEST_FAILURE.labels(labels).inc();
349       slowLog.setError(e);
350       throw e;
351     } finally {
352       requestTimer.observeDuration();
353       span.end();
354       slowLog.log();
355     }
356   }
357 
358   @Override
359   public Optional<Long> getKeyTTL(ByteString key) {
360     String[] labels = withClusterId("client_raw_get_key_ttl");
361     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
362     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
363     SlowLogSpan span = slowLog.start("getKeyTTL");
364     span.addProperty("key", KeyUtils.formatBytesUTF8(key));
365     ConcreteBackOffer backOffer =
366         ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog, clusterId);
367     try {
368       while (true) {
369         try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
370           span.addProperty("region", client.getRegion().toString());
371           Optional<Long> result = client.rawGetKeyTTL(backOffer, key);
372           RAW_REQUEST_SUCCESS.labels(labels).inc();
373           return result;
374         } catch (final TiKVException e) {
375           backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
376           logger.warn("Retry for getKeyTTL error", e);
377         }
378       }
379     } catch (Exception e) {
380       RAW_REQUEST_FAILURE.labels(labels).inc();
381       slowLog.setError(e);
382       throw e;
383     } finally {
384       requestTimer.observeDuration();
385       span.end();
386       slowLog.log();
387     }
388   }
389 
390   @Override
391   public List<List<ByteString>> batchScanKeys(
392       List<Pair<ByteString, ByteString>> ranges, int eachLimit) {
393     return batchScan(
394             ranges
395                 .stream()
396                 .map(
397                     range ->
398                         ScanOption.newBuilder()
399                             .setStartKey(range.first)
400                             .setEndKey(range.second)
401                             .setLimit(eachLimit)
402                             .setKeyOnly(true)
403                             .build())
404                 .collect(Collectors.toList()))
405         .stream()
406         .map(kvs -> kvs.stream().map(kv -> kv.getKey()).collect(Collectors.toList()))
407         .collect(Collectors.toList());
408   }
409 
410   @Override
411   public List<List<KvPair>> batchScan(List<ScanOption> ranges) {
412     String[] labels = withClusterId("client_raw_batch_scan");
413     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
414     long deadline = System.currentTimeMillis() + conf.getRawKVScanTimeoutInMS();
415     List<Future<Pair<Integer, List<KvPair>>>> futureList = new ArrayList<>();
416     try {
417       if (ranges.isEmpty()) {
418         return new ArrayList<>();
419       }
420       ExecutorCompletionService<Pair<Integer, List<KvPair>>> completionService =
421           new ExecutorCompletionService<>(batchScanThreadPool);
422       int num = 0;
423       for (ScanOption scanOption : ranges) {
424         int i = num;
425         futureList.add(completionService.submit(() -> Pair.create(i, scan(scanOption))));
426         ++num;
427       }
428       List<List<KvPair>> scanResults = new ArrayList<>();
429       for (int i = 0; i < num; i++) {
430         scanResults.add(new ArrayList<>());
431       }
432       for (int i = 0; i < num; i++) {
433         try {
434           Future<Pair<Integer, List<KvPair>>> future =
435               completionService.poll(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
436           if (future == null) {
437             throw new TiKVException("TimeOut Exceeded for current operation.");
438           }
439           Pair<Integer, List<KvPair>> scanResult = future.get();
440           scanResults.set(scanResult.first, scanResult.second);
441         } catch (InterruptedException e) {
442           Thread.currentThread().interrupt();
443           throw new TiKVException("Current thread interrupted.", e);
444         } catch (ExecutionException e) {
445           throw new TiKVException("Execution exception met.", e);
446         }
447       }
448       RAW_REQUEST_SUCCESS.labels(labels).inc();
449       return scanResults;
450     } catch (Exception e) {
451       RAW_REQUEST_FAILURE.labels(labels).inc();
452       for (Future<Pair<Integer, List<KvPair>>> future : futureList) {
453         future.cancel(true);
454       }
455       throw e;
456     } finally {
457       requestTimer.observeDuration();
458     }
459   }
460 
461   @Override
462   public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
463     return scan(startKey, endKey, limit, false);
464   }
465 
466   @Override
467   public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
468     String[] labels = withClusterId("client_raw_scan");
469     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
470     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
471     SlowLogSpan span = slowLog.start("scan");
472     span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
473     span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
474     span.addProperty("limit", String.valueOf(limit));
475     span.addProperty("keyOnly", String.valueOf(keyOnly));
476     ConcreteBackOffer backOffer =
477         ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog, clusterId);
478     try {
479       Iterator<KvPair> iterator =
480           rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer);
481       List<KvPair> result = new ArrayList<>();
482       iterator.forEachRemaining(result::add);
483       RAW_REQUEST_SUCCESS.labels(labels).inc();
484       return result;
485     } catch (Exception e) {
486       RAW_REQUEST_FAILURE.labels(labels).inc();
487       slowLog.setError(e);
488       throw e;
489     } finally {
490       requestTimer.observeDuration();
491       span.end();
492       slowLog.log();
493     }
494   }
495 
496   @Override
497   public List<KvPair> scan(ByteString startKey, int limit) {
498     return scan(startKey, limit, false);
499   }
500 
501   @Override
502   public List<KvPair> scan(ByteString startKey, int limit, boolean keyOnly) {
503     return scan(startKey, ByteString.EMPTY, limit, keyOnly);
504   }
505 
506   @Override
507   public List<KvPair> scan(ByteString startKey, ByteString endKey) {
508     return scan(startKey, endKey, false);
509   }
510 
511   @Override
512   public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
513     String[] labels = withClusterId("client_raw_scan_without_limit");
514     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
515     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
516     SlowLogSpan span = slowLog.start("scan");
517     span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
518     span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
519     span.addProperty("keyOnly", String.valueOf(keyOnly));
520     ConcreteBackOffer backOffer =
521         ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog, clusterId);
522     try {
523       ByteString newStartKey = startKey;
524       List<KvPair> result = new ArrayList<>();
525       while (true) {
526         Iterator<KvPair> iterator =
527             rawScanIterator(
528                 conf,
529                 clientBuilder,
530                 newStartKey,
531                 endKey,
532                 conf.getScanBatchSize(),
533                 keyOnly,
534                 backOffer);
535         if (!iterator.hasNext()) {
536           break;
537         }
538         iterator.forEachRemaining(result::add);
539         newStartKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString();
540       }
541       RAW_REQUEST_SUCCESS.labels(labels).inc();
542       return result;
543     } catch (Exception e) {
544       RAW_REQUEST_FAILURE.labels(labels).inc();
545       slowLog.setError(e);
546       throw e;
547     } finally {
548       requestTimer.observeDuration();
549       span.end();
550       slowLog.log();
551     }
552   }
553 
554   private List<KvPair> scan(ScanOption scanOption) {
555     ByteString startKey = scanOption.getStartKey();
556     ByteString endKey = scanOption.getEndKey();
557     int limit = scanOption.getLimit();
558     boolean keyOnly = scanOption.isKeyOnly();
559     return scan(startKey, endKey, limit, keyOnly);
560   }
561 
562   @Override
563   public List<KvPair> scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) {
564     return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), limit, keyOnly);
565   }
566 
567   @Override
568   public List<KvPair> scanPrefix(ByteString prefixKey) {
569     return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString());
570   }
571 
572   @Override
573   public List<KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly) {
574     return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), keyOnly);
575   }
576 
577   @Override
578   public void delete(ByteString key) {
579     String[] labels = withClusterId("client_raw_delete");
580     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
581     SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
582     SlowLogSpan span = slowLog.start("delete");
583     span.addProperty("key", KeyUtils.formatBytesUTF8(key));
584     ConcreteBackOffer backOffer =
585         ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId);
586     try {
587       while (true) {
588         try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
589           span.addProperty("region", client.getRegion().toString());
590           client.rawDelete(backOffer, key, atomicForCAS);
591           RAW_REQUEST_SUCCESS.labels(labels).inc();
592           return;
593         } catch (final TiKVException e) {
594           backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
595           logger.warn("Retry for delete error", e);
596         }
597       }
598     } catch (Exception e) {
599       RAW_REQUEST_FAILURE.labels(labels).inc();
600       slowLog.setError(e);
601       throw e;
602     } finally {
603       requestTimer.observeDuration();
604       span.end();
605       slowLog.log();
606     }
607   }
608 
609   @Override
610   public synchronized void deleteRange(ByteString startKey, ByteString endKey) {
611     String[] labels = withClusterId("client_raw_delete_range");
612     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
613     ConcreteBackOffer backOffer =
614         ConcreteBackOffer.newDeadlineBackOff(
615             conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE, clusterId);
616     try {
617       long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS();
618       doSendDeleteRange(backOffer, startKey, endKey, deadline);
619       RAW_REQUEST_SUCCESS.labels(labels).inc();
620     } catch (Exception e) {
621       RAW_REQUEST_FAILURE.labels(labels).inc();
622       throw e;
623     } finally {
624       requestTimer.observeDuration();
625     }
626   }
627 
628   @Override
629   public synchronized void deletePrefix(ByteString key) {
630     ByteString endKey = Key.toRawKey(key).nextPrefix().toByteString();
631     deleteRange(key, endKey);
632   }
633 
634   @Override
635   public TiSession getSession() {
636     return tiSession;
637   }
638 
639   /**
640    * Ingest KV pairs to RawKV using StreamKV API.
641    *
642    * @param list
643    */
644   public synchronized void ingest(List<Pair<ByteString, ByteString>> list) {
645     ingest(list, null);
646   }
647 
648   /**
649    * Ingest KV pairs to RawKV using StreamKV API.
650    *
651    * @param list
652    * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
653    */
654   public synchronized void ingest(List<Pair<ByteString, ByteString>> list, Long ttl)
655       throws GrpcException {
656     if (list.isEmpty()) {
657       return;
658     }
659 
660     Key min = Key.MAX;
661     Key max = Key.MIN;
662     Map<ByteString, ByteString> map = new HashMap<>(list.size());
663 
664     for (Pair<ByteString, ByteString> pair : list) {
665       map.put(pair.first, pair.second);
666       Key key = Key.toRawKey(pair.first.toByteArray());
667       if (key.compareTo(min) < 0) {
668         min = key;
669       }
670       if (key.compareTo(max) > 0) {
671         max = key;
672       }
673     }
674 
675     SwitchTiKVModeClient switchTiKVModeClient = tiSession.getSwitchTiKVModeClient();
676 
677     try {
678       // switch to normal mode
679       switchTiKVModeClient.switchTiKVToNormalMode();
680 
681       // region split
682       List<byte[]> splitKeys = new ArrayList<>(2);
683       splitKeys.add(min.getBytes());
684       splitKeys.add(max.next().getBytes());
685       tiSession.splitRegionAndScatter(splitKeys);
686       tiSession.getRegionManager().invalidateAll();
687 
688       // switch to import mode
689       switchTiKVModeClient.keepTiKVToImportMode();
690 
691       // group keys by region
692       List<ByteString> keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList());
693       Map<TiRegion, List<ByteString>> groupKeys =
694           groupKeysByRegion(clientBuilder.getRegionManager(), keyList, defaultBackOff());
695 
696       // ingest for each region
697       for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
698         TiRegion region = entry.getKey();
699         List<ByteString> keys = entry.getValue();
700         List<Pair<ByteString, ByteString>> kvs =
701             keys.stream().map(k -> Pair.create(k, map.get(k))).collect(Collectors.toList());
702         doIngest(region, kvs, ttl);
703       }
704     } finally {
705       // swith tikv to normal mode
706       switchTiKVModeClient.stopKeepTiKVToImportMode();
707       switchTiKVModeClient.switchTiKVToNormalMode();
708     }
709   }
710 
711   private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sortedList, Long ttl)
712       throws GrpcException {
713     if (sortedList.isEmpty()) {
714       return;
715     }
716 
717     ByteString uuid = ByteString.copyFrom(genUUID());
718     Key minKey = Key.toRawKey(sortedList.get(0).first);
719     Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first);
720     ImporterClient importerClient =
721         new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl);
722     importerClient.write(sortedList.iterator());
723   }
724 
725   private void doSendBatchPut(
726       BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl, long deadline) {
727     ExecutorCompletionService<List<Batch>> completionService =
728         new ExecutorCompletionService<>(batchPutThreadPool);
729 
730     List<Future<List<Batch>>> futureList = new ArrayList<>();
731 
732     Map<TiRegion, List<ByteString>> groupKeys =
733         groupKeysByRegion(clientBuilder.getRegionManager(), kvPairs.keySet(), backOffer);
734     List<Batch> batches = new ArrayList<>();
735 
736     for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
737       appendBatches(
738           backOffer,
739           batches,
740           entry.getKey(),
741           entry.getValue(),
742           entry.getValue().stream().map(kvPairs::get).collect(Collectors.toList()),
743           RAW_BATCH_PUT_SIZE,
744           MAX_RAW_BATCH_LIMIT);
745     }
746     Queue<List<Batch>> taskQueue = new LinkedList<>();
747     taskQueue.offer(batches);
748 
749     while (!taskQueue.isEmpty()) {
750       List<Batch> task = taskQueue.poll();
751       for (Batch batch : task) {
752         futureList.add(
753             completionService.submit(
754                 () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)));
755       }
756 
757       try {
758         getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
759       } catch (Exception e) {
760         for (Future<List<Batch>> future : futureList) {
761           future.cancel(true);
762         }
763         throw e;
764       }
765     }
766   }
767 
768   private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
769     try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
770       client.setTimeout(conf.getRawKVBatchWriteTimeoutInMS());
771       client.rawBatchPut(backOffer, batch, ttl, atomicForCAS);
772       return new ArrayList<>();
773     } catch (final TiKVException e) {
774       // TODO: any elegant way to re-split the ranges if fails?
775       backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
776       logger.warn("ReSplitting ranges for BatchPutRequest", e);
777       // retry
778       return doSendBatchPutWithRefetchRegion(backOffer, batch);
779     }
780   }
781 
782   private List<Batch> doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch batch) {
783     Map<TiRegion, List<ByteString>> groupKeys =
784         groupKeysByRegion(clientBuilder.getRegionManager(), batch.getKeys(), backOffer);
785     List<Batch> retryBatches = new ArrayList<>();
786 
787     for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
788       appendBatches(
789           backOffer,
790           retryBatches,
791           entry.getKey(),
792           entry.getValue(),
793           entry.getValue().stream().map(batch.getMap()::get).collect(Collectors.toList()),
794           RAW_BATCH_PUT_SIZE,
795           MAX_RAW_BATCH_LIMIT);
796     }
797 
798     return retryBatches;
799   }
800 
801   private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys, long deadline) {
802     ExecutorCompletionService<Pair<List<Batch>, List<KvPair>>> completionService =
803         new ExecutorCompletionService<>(batchGetThreadPool);
804 
805     List<Future<Pair<List<Batch>, List<KvPair>>>> futureList = new ArrayList<>();
806 
807     List<Batch> batches =
808         getBatches(backOffer, keys, RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder);
809 
810     Queue<List<Batch>> taskQueue = new LinkedList<>();
811     List<KvPair> result = new ArrayList<>();
812     taskQueue.offer(batches);
813 
814     while (!taskQueue.isEmpty()) {
815       List<Batch> task = taskQueue.poll();
816       for (Batch batch : task) {
817         futureList.add(
818             completionService.submit(
819                 () -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch)));
820       }
821       try {
822         result.addAll(
823             getTasksWithOutput(
824                 completionService, taskQueue, task, deadline - System.currentTimeMillis()));
825       } catch (Exception e) {
826         for (Future<Pair<List<Batch>, List<KvPair>>> future : futureList) {
827           future.cancel(true);
828         }
829         throw e;
830       }
831     }
832 
833     return result;
834   }
835 
836   private Pair<List<Batch>, List<KvPair>> doSendBatchGetInBatchesWithRetry(
837       BackOffer backOffer, Batch batch) {
838 
839     try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
840       List<KvPair> partialResult = client.rawBatchGet(backOffer, batch.getKeys());
841       return Pair.create(new ArrayList<>(), partialResult);
842     } catch (final TiKVException e) {
843       backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
844       clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
845       logger.warn("ReSplitting ranges for BatchGetRequest", e);
846 
847       // retry
848       return Pair.create(doSendBatchGetWithRefetchRegion(backOffer, batch), new ArrayList<>());
849     }
850   }
851 
852   private List<Batch> doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch batch) {
853     return getBatches(
854         backOffer, batch.getKeys(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
855   }
856 
857   private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys, long deadline) {
858     ExecutorCompletionService<List<Batch>> completionService =
859         new ExecutorCompletionService<>(batchDeleteThreadPool);
860 
861     List<Future<List<Batch>>> futureList = new ArrayList<>();
862 
863     List<Batch> batches =
864         getBatches(backOffer, keys, RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder);
865 
866     Queue<List<Batch>> taskQueue = new LinkedList<>();
867     taskQueue.offer(batches);
868 
869     while (!taskQueue.isEmpty()) {
870       List<Batch> task = taskQueue.poll();
871       for (Batch batch : task) {
872         futureList.add(
873             completionService.submit(
874                 () -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch)));
875       }
876       try {
877         getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
878       } catch (Exception e) {
879         for (Future<List<Batch>> future : futureList) {
880           future.cancel(true);
881         }
882         throw e;
883       }
884     }
885   }
886 
887   private List<Batch> doSendBatchDeleteInBatchesWithRetry(BackOffer backOffer, Batch batch) {
888     try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
889       client.rawBatchDelete(backOffer, batch.getKeys(), atomicForCAS);
890       return new ArrayList<>();
891     } catch (final TiKVException e) {
892       backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
893       clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
894       logger.warn("ReSplitting ranges for BatchGetRequest", e);
895 
896       // retry
897       return doSendBatchDeleteWithRefetchRegion(backOffer, batch);
898     }
899   }
900 
901   private List<Batch> doSendBatchDeleteWithRefetchRegion(BackOffer backOffer, Batch batch) {
902     return getBatches(
903         backOffer, batch.getKeys(), RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
904   }
905 
906   private ByteString calcKeyByCondition(boolean condition, ByteString key1, ByteString key2) {
907     if (condition) {
908       return key1;
909     }
910     return key2;
911   }
912 
913   private void doSendDeleteRange(
914       BackOffer backOffer, ByteString startKey, ByteString endKey, long deadline) {
915     ExecutorCompletionService<List<DeleteRange>> completionService =
916         new ExecutorCompletionService<>(deleteRangeThreadPool);
917 
918     List<Future<List<DeleteRange>>> futureList = new ArrayList<>();
919 
920     List<TiRegion> regions = fetchRegionsFromRange(backOffer, startKey, endKey);
921     List<DeleteRange> ranges = new ArrayList<>();
922     for (int i = 0; i < regions.size(); i++) {
923       TiRegion region = regions.get(i);
924       ByteString start = calcKeyByCondition(i == 0, startKey, region.getStartKey());
925       ByteString end = calcKeyByCondition(i == regions.size() - 1, endKey, region.getEndKey());
926       ranges.add(new DeleteRange(backOffer, region, start, end));
927     }
928     Queue<List<DeleteRange>> taskQueue = new LinkedList<>();
929     taskQueue.offer(ranges);
930     while (!taskQueue.isEmpty()) {
931       List<DeleteRange> task = taskQueue.poll();
932       for (DeleteRange range : task) {
933         futureList.add(
934             completionService.submit(
935                 () -> doSendDeleteRangeWithRetry(range.getBackOffer(), range)));
936       }
937       try {
938         getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
939       } catch (Exception e) {
940         for (Future<List<DeleteRange>> future : futureList) {
941           future.cancel(true);
942         }
943         throw e;
944       }
945     }
946   }
947 
948   private List<DeleteRange> doSendDeleteRangeWithRetry(BackOffer backOffer, DeleteRange range) {
949     try (RegionStoreClient client = clientBuilder.build(range.getRegion(), backOffer)) {
950       client.setTimeout(conf.getScanTimeout());
951       client.rawDeleteRange(backOffer, range.getStartKey(), range.getEndKey());
952       return new ArrayList<>();
953     } catch (final TiKVException e) {
954       backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
955       clientBuilder.getRegionManager().invalidateRegion(range.getRegion());
956       logger.warn("ReSplitting ranges for BatchDeleteRangeRequest", e);
957 
958       // retry
959       return doSendDeleteRangeWithRefetchRegion(backOffer, range);
960     }
961   }
962 
963   private List<DeleteRange> doSendDeleteRangeWithRefetchRegion(
964       BackOffer backOffer, DeleteRange range) {
965     List<TiRegion> regions =
966         fetchRegionsFromRange(backOffer, range.getStartKey(), range.getEndKey());
967     List<DeleteRange> retryRanges = new ArrayList<>();
968     for (int i = 0; i < regions.size(); i++) {
969       TiRegion region = regions.get(i);
970       ByteString start = calcKeyByCondition(i == 0, range.getStartKey(), region.getStartKey());
971       ByteString end =
972           calcKeyByCondition(i == regions.size() - 1, range.getEndKey(), region.getEndKey());
973       retryRanges.add(new DeleteRange(backOffer, region, start, end));
974     }
975     return retryRanges;
976   }
977 
978   private static Map<ByteString, ByteString> mapKeysToValues(
979       List<ByteString> keys, List<ByteString> values) {
980     Map<ByteString, ByteString> map = new HashMap<>();
981     for (int i = 0; i < keys.size(); i++) {
982       map.put(keys.get(i), values.get(i));
983     }
984     return map;
985   }
986 
987   private List<TiRegion> fetchRegionsFromRange(
988       BackOffer backOffer, ByteString startKey, ByteString endKey) {
989     List<TiRegion> regions = new ArrayList<>();
990     while (startKey.isEmpty()
991         || endKey.isEmpty()
992         || Key.toRawKey(startKey).compareTo(Key.toRawKey(endKey)) < 0) {
993       TiRegion currentRegion = clientBuilder.getRegionManager().getRegionByKey(startKey, backOffer);
994       regions.add(currentRegion);
995       startKey = currentRegion.getEndKey();
996       if (currentRegion.getEndKey().isEmpty()) {
997         break;
998       }
999     }
1000     return regions;
1001   }
1002 
1003   private Iterator<KvPair> rawScanIterator(
1004       TiConfiguration conf,
1005       RegionStoreClientBuilder builder,
1006       ByteString startKey,
1007       ByteString endKey,
1008       int limit,
1009       boolean keyOnly,
1010       BackOffer backOffer) {
1011     if (limit > MAX_RAW_SCAN_LIMIT) {
1012       throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
1013     }
1014     return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer);
1015   }
1016 
1017   /**
1018    * Scan raw key-value pairs from TiKV in range [startKey, endKey)
1019    *
1020    * @param startKey raw start key, inclusive
1021    * @param endKey raw end key, exclusive
1022    * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
1023    * @return iterator of key-value pairs in range
1024    */
1025   public Iterator<KvPair> scan0(ByteString startKey, ByteString endKey, int limit) {
1026     return scan0(startKey, endKey, limit, false);
1027   }
1028 
1029   /**
1030    * Scan raw key-value pairs from TiKV in range [startKey, ♾)
1031    *
1032    * @param startKey raw start key, inclusive
1033    * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
1034    * @return iterator of key-value pairs in range
1035    */
1036   public Iterator<KvPair> scan0(ByteString startKey, int limit) {
1037     return scan0(startKey, limit, false);
1038   }
1039 
1040   /**
1041    * Scan raw key-value pairs from TiKV in range [startKey, ♾)
1042    *
1043    * @param startKey raw start key, inclusive
1044    * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
1045    * @param keyOnly whether to scan in key-only mode
1046    * @return iterator of key-value pairs in range
1047    */
1048   public Iterator<KvPair> scan0(ByteString startKey, int limit, boolean keyOnly) {
1049     return scan0(startKey, ByteString.EMPTY, limit, keyOnly);
1050   }
1051 
1052   /**
1053    * Scan raw key-value pairs from TiKV in range [startKey, endKey)
1054    *
1055    * @param startKey raw start key, inclusive
1056    * @param endKey raw end key, exclusive
1057    * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
1058    * @param keyOnly whether to scan in key-only mode
1059    * @return iterator of key-value pairs in range
1060    */
1061   public Iterator<KvPair> scan0(
1062       ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
1063     String[] labels = withClusterId("client_raw_scan");
1064     Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
1065     try {
1066       Iterator<KvPair> iterator =
1067           rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, defaultBackOff());
1068       RAW_REQUEST_SUCCESS.labels(labels).inc();
1069       return iterator;
1070     } catch (Exception e) {
1071       RAW_REQUEST_FAILURE.labels(labels).inc();
1072       throw e;
1073     } finally {
1074       requestTimer.observeDuration();
1075     }
1076   }
1077 
1078   /**
1079    * Scan all raw key-value pairs from TiKV in range [startKey, endKey)
1080    *
1081    * @param startKey raw start key, inclusive
1082    * @param endKey raw end key, exclusive
1083    * @return iterator of key-value pairs in range
1084    */
1085   public Iterator<KvPair> scan0(ByteString startKey, ByteString endKey) {
1086     return scan0(startKey, endKey, false);
1087   }
1088 
1089   private Iterator<KvPair> scan0(ScanOption scanOption) {
1090     ByteString startKey = scanOption.getStartKey();
1091     ByteString endKey = scanOption.getEndKey();
1092     int limit = scanOption.getLimit();
1093     boolean keyOnly = scanOption.isKeyOnly();
1094     return scan0(startKey, endKey, limit, keyOnly);
1095   }
1096 
1097   /**
1098    * Scan keys with prefix
1099    *
1100    * @param prefixKey prefix key
1101    * @param limit limit of keys retrieved
1102    * @param keyOnly whether to scan in keyOnly mode
1103    * @return kvPairs iterator with the specified prefix
1104    */
1105   public Iterator<KvPair> scanPrefix0(ByteString prefixKey, int limit, boolean keyOnly) {
1106     return scan0(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), limit, keyOnly);
1107   }
1108 
1109   public Iterator<KvPair> scanPrefix0(ByteString prefixKey) {
1110     return scan0(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString());
1111   }
1112 
1113   public Iterator<KvPair> scanPrefix0(ByteString prefixKey, boolean keyOnly) {
1114     return scan0(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), keyOnly);
1115   }
1116 
1117   /**
1118    * Scan all raw key-value pairs from TiKV in range [startKey, endKey)
1119    *
1120    * @param startKey raw start key, inclusive
1121    * @param endKey raw end key, exclusive
1122    * @param keyOnly whether to scan in key-only mode
1123    * @return iterator of key-value pairs in range
1124    */
1125   public Iterator<KvPair> scan0(ByteString startKey, ByteString endKey, boolean keyOnly) {
1126     return new TikvIterator(startKey, endKey, keyOnly);
1127   }
1128 
1129   public class TikvIterator implements Iterator<KvPair> {
1130 
1131     private Iterator<KvPair> iterator;
1132 
1133     private final ByteString startKey;
1134     private final ByteString endKey;
1135     private final boolean keyOnly;
1136 
1137     private KvPair last;
1138 
1139     public TikvIterator(ByteString startKey, ByteString endKey, boolean keyOnly) {
1140       this.startKey = startKey;
1141       this.endKey = endKey;
1142       this.keyOnly = keyOnly;
1143 
1144       this.iterator =
1145           rawScanIterator(
1146               conf,
1147               clientBuilder,
1148               this.startKey,
1149               this.endKey,
1150               conf.getScanBatchSize(),
1151               keyOnly,
1152               defaultBackOff());
1153     }
1154 
1155     @Override
1156     public boolean hasNext() {
1157       if (this.iterator.hasNext()) {
1158         return true;
1159       }
1160       if (this.last == null) {
1161         return false;
1162       }
1163       ByteString startKey = Key.toRawKey(this.last.getKey()).next().toByteString();
1164       this.iterator =
1165           rawScanIterator(
1166               conf,
1167               clientBuilder,
1168               startKey,
1169               endKey,
1170               conf.getScanBatchSize(),
1171               keyOnly,
1172               defaultBackOff());
1173       this.last = null;
1174       return this.iterator.hasNext();
1175     }
1176 
1177     @Override
1178     public KvPair next() {
1179       KvPair next = this.iterator.next();
1180       this.last = next;
1181       return next;
1182     }
1183   }
1184 
1185   private BackOffer defaultBackOff() {
1186     return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS(), clusterId);
1187   }
1188 }