View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.operation.iterator;
19  
20  import static java.util.Objects.requireNonNull;
21  
22  import com.google.protobuf.ByteString;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  import org.tikv.common.TiConfiguration;
26  import org.tikv.common.exception.GrpcException;
27  import org.tikv.common.exception.KeyException;
28  import org.tikv.common.key.Key;
29  import org.tikv.common.region.RegionStoreClient;
30  import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
31  import org.tikv.common.region.TiRegion;
32  import org.tikv.common.region.TiStore;
33  import org.tikv.common.util.BackOffer;
34  import org.tikv.common.util.ConcreteBackOffer;
35  import org.tikv.common.util.Pair;
36  import org.tikv.kvproto.Kvrpcpb;
37  
38  public class ConcreteScanIterator extends ScanIterator {
39    private final long version;
40    private final Logger logger = LoggerFactory.getLogger(ConcreteScanIterator.class);
41  
42    public ConcreteScanIterator(
43        TiConfiguration conf,
44        RegionStoreClientBuilder builder,
45        ByteString startKey,
46        long version,
47        int limit) {
48      // Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
49      this(conf, builder, startKey, ByteString.EMPTY, version, limit);
50    }
51  
52    public ConcreteScanIterator(
53        TiConfiguration conf,
54        RegionStoreClientBuilder builder,
55        ByteString startKey,
56        ByteString endKey,
57        long version) {
58      // Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
59      this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE);
60    }
61  
62    private ConcreteScanIterator(
63        TiConfiguration conf,
64        RegionStoreClientBuilder builder,
65        ByteString startKey,
66        ByteString endKey,
67        long version,
68        int limit) {
69      super(conf, builder, startKey, endKey, limit, false);
70      this.version = version;
71    }
72  
73    @Override
74    TiRegion loadCurrentRegionToCache() throws GrpcException {
75      TiRegion region;
76      try (RegionStoreClient client = builder.build(startKey)) {
77        client.setTimeout(conf.getScanTimeout());
78        BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
79        currentCache = client.scan(backOffer, startKey, version);
80        // If we get region before scan, we will use region from cache which
81        // may have wrong end key. This may miss some regions that split from old region.
82        // Client will get the newest region during scan. So we need to
83        // update region after scan.
84        region = client.getRegion();
85        return region;
86      }
87    }
88  
89    private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
90      logger.warn(String.format("resolve current key error %s", current.getError().toString()));
91      Pair<TiRegion, TiStore> pair =
92          builder.getRegionManager().getRegionStorePairByKey(current.getKey());
93      TiRegion region = pair.first;
94      TiStore store = pair.second;
95      BackOffer backOffer =
96          ConcreteBackOffer.newGetBackOff(builder.getRegionManager().getPDClient().getClusterId());
97      try (RegionStoreClient client = builder.build(region, store)) {
98        return client.get(backOffer, current.getKey(), version);
99      } catch (Exception e) {
100       throw new KeyException(current.getError());
101     }
102   }
103 
104   @Override
105   public boolean hasNext() {
106     Kvrpcpb.KvPair current;
107     // continue when cache is empty but not null
108     do {
109       current = getCurrent();
110       if (isCacheDrained() && cacheLoadFails()) {
111         endOfScan = true;
112         return false;
113       }
114     } while (currentCache != null && current == null);
115     // for last batch to be processed, we have to check if
116     return !processingLastBatch
117         || current == null
118         || (hasEndKey && Key.toRawKey(current.getKey()).compareTo(endKey) < 0);
119   }
120 
121   @Override
122   public Kvrpcpb.KvPair next() {
123     --limit;
124     Kvrpcpb.KvPair current = currentCache.get(index++);
125 
126     requireNonNull(current, "current kv pair cannot be null");
127     if (current.hasError()) {
128       ByteString val = resolveCurrentLock(current);
129       current = Kvrpcpb.KvPair.newBuilder().setKey(current.getKey()).setValue(val).build();
130     }
131 
132     return current;
133   }
134 
135   /**
136    * Cache is drained when - no data extracted - scan limit was not defined - have read the last
137    * index of cache - index not initialized
138    *
139    * @return whether cache is drained
140    */
141   private boolean isCacheDrained() {
142     return currentCache == null || limit <= 0 || index >= currentCache.size() || index == -1;
143   }
144 
145   private Kvrpcpb.KvPair getCurrent() {
146     if (isCacheDrained()) {
147       return null;
148     }
149     return currentCache.get(index);
150   }
151 }