View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.operation.iterator;
19  
20  import com.google.protobuf.ByteString;
21  import org.tikv.common.TiConfiguration;
22  import org.tikv.common.exception.GrpcException;
23  import org.tikv.common.exception.TiKVException;
24  import org.tikv.common.key.Key;
25  import org.tikv.common.region.RegionStoreClient;
26  import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
27  import org.tikv.common.region.TiRegion;
28  import org.tikv.common.util.BackOffFunction;
29  import org.tikv.common.util.BackOffer;
30  import org.tikv.kvproto.Kvrpcpb;
31  
32  public class RawScanIterator extends ScanIterator {
33    private final BackOffer scanBackOffer;
34  
35    public RawScanIterator(
36        TiConfiguration conf,
37        RegionStoreClientBuilder builder,
38        ByteString startKey,
39        ByteString endKey,
40        int limit,
41        boolean keyOnly,
42        BackOffer scanBackOffer) {
43      super(conf, builder, startKey, endKey, limit, keyOnly);
44  
45      this.scanBackOffer = scanBackOffer;
46    }
47  
48    @Override
49    TiRegion loadCurrentRegionToCache() throws GrpcException {
50      BackOffer backOffer = scanBackOffer;
51      while (true) {
52        try (RegionStoreClient client = builder.build(startKey, backOffer)) {
53          client.setTimeout(conf.getRawKVScanTimeoutInMS());
54          TiRegion region = client.getRegion();
55          if (limit <= 0) {
56            currentCache = null;
57          } else {
58            try {
59              currentCache = client.rawScan(backOffer, startKey, limit, keyOnly);
60              // Client will get the newest region during scan. So we need to
61              // update region after scan.
62              region = client.getRegion();
63            } catch (final TiKVException e) {
64              backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
65              continue;
66            }
67          }
68          return region;
69        }
70      }
71    }
72  
73    private boolean endOfScan() {
74      if (!processingLastBatch) {
75        return false;
76      }
77      ByteString lastKey = currentCache.get(index).getKey();
78      return !lastKey.isEmpty() && Key.toRawKey(lastKey).compareTo(endKey) >= 0;
79    }
80  
81    boolean isCacheDrained() {
82      return currentCache == null || limit <= 0 || index >= currentCache.size() || index == -1;
83    }
84  
85    @Override
86    public boolean hasNext() {
87      if (isCacheDrained() && cacheLoadFails()) {
88        endOfScan = true;
89        return false;
90      }
91      // continue when cache is empty but not null
92      while (currentCache != null && currentCache.isEmpty()) {
93        if (cacheLoadFails()) {
94          return false;
95        }
96      }
97      return !endOfScan();
98    }
99  
100   private Kvrpcpb.KvPair getCurrent() {
101     --limit;
102     return currentCache.get(index++);
103   }
104 
105   @Override
106   public Kvrpcpb.KvPair next() {
107     return getCurrent();
108   }
109 }