View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common;
19  
20  import static org.tikv.common.util.ClientUtils.*;
21  
22  import com.google.protobuf.ByteString;
23  import java.util.ArrayList;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Objects;
27  import java.util.concurrent.ExecutorCompletionService;
28  import java.util.concurrent.ExecutorService;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.tikv.common.exception.GrpcException;
32  import org.tikv.common.exception.TiKVException;
33  import org.tikv.common.operation.iterator.ConcreteScanIterator;
34  import org.tikv.common.region.RegionStoreClient;
35  import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
36  import org.tikv.common.region.TiRegion;
37  import org.tikv.common.util.BackOffFunction;
38  import org.tikv.common.util.BackOffer;
39  import org.tikv.common.util.Batch;
40  import org.tikv.common.util.ConcreteBackOffer;
41  import org.tikv.kvproto.Kvrpcpb.KvPair;
42  
43  public class KVClient implements AutoCloseable {
44    private static final Logger logger = LoggerFactory.getLogger(KVClient.class);
45    private static final int MAX_BATCH_LIMIT = 1024;
46    private static final int BATCH_GET_SIZE = 16 * 1024;
47    private final RegionStoreClientBuilder clientBuilder;
48    private final TiConfiguration conf;
49    private final ExecutorService batchGetThreadPool;
50  
51    public KVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
52      Objects.requireNonNull(clientBuilder, "clientBuilder is null");
53      this.conf = session.getConf();
54      this.clientBuilder = clientBuilder;
55      this.batchGetThreadPool = session.getThreadPoolForBatchGet();
56    }
57  
58    @Override
59    public void close() {}
60  
61    /**
62     * Get a key-value pair from TiKV if key exists
63     *
64     * @param key key
65     * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
66     */
67    public ByteString get(ByteString key, long version) throws GrpcException {
68      BackOffer backOffer =
69          ConcreteBackOffer.newGetBackOff(
70              clientBuilder.getRegionManager().getPDClient().getClusterId());
71      while (true) {
72        RegionStoreClient client = clientBuilder.build(key);
73        try {
74          return client.get(backOffer, key, version);
75        } catch (final TiKVException e) {
76          backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
77        }
78      }
79    }
80  
81    /**
82     * Get a set of key-value pair by keys from TiKV
83     *
84     * @param backOffer
85     * @param keys
86     * @param version
87     * @return
88     * @throws GrpcException
89     */
90    public List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long version)
91        throws GrpcException {
92      return doSendBatchGet(backOffer, keys, version);
93    }
94  
95    /**
96     * Scan key-value pairs from TiKV in range [startKey, endKey)
97     *
98     * @param startKey start key, inclusive
99     * @param endKey end key, exclusive
100    * @return list of key-value pairs in range
101    */
102   public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
103       throws GrpcException {
104     Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, endKey, version);
105     List<KvPair> result = new ArrayList<>();
106     iterator.forEachRemaining(result::add);
107     return result;
108   }
109 
110   /**
111    * Scan key-value pairs from TiKV in range [startKey, ♾), maximum to `limit` pairs
112    *
113    * @param startKey start key, inclusive
114    * @param limit limit of kv pairs
115    * @return list of key-value pairs in range
116    */
117   public List<KvPair> scan(ByteString startKey, long version, int limit) throws GrpcException {
118     Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
119     List<KvPair> result = new ArrayList<>();
120     iterator.forEachRemaining(result::add);
121     return result;
122   }
123 
124   public List<KvPair> scan(ByteString startKey, long version) throws GrpcException {
125     return scan(startKey, version, Integer.MAX_VALUE);
126   }
127 
128   private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys, long version) {
129     ExecutorCompletionService<List<KvPair>> completionService =
130         new ExecutorCompletionService<>(batchGetThreadPool);
131 
132     List<Batch> batches =
133         getBatches(backOffer, keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
134 
135     for (Batch batch : batches) {
136       completionService.submit(
137           () -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch, version));
138     }
139 
140     return getKvPairs(completionService, batches, BackOffer.BATCH_GET_MAX_BACKOFF);
141   }
142 
143   private List<KvPair> doSendBatchGetInBatchesWithRetry(
144       BackOffer backOffer, Batch batch, long version) {
145     TiRegion oldRegion = batch.getRegion();
146     TiRegion currentRegion =
147         clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());
148 
149     if (oldRegion.equals(currentRegion)) {
150       RegionStoreClient client = clientBuilder.build(batch.getRegion());
151       try {
152         return client.batchGet(backOffer, batch.getKeys(), version);
153       } catch (final TiKVException e) {
154         backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
155         clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
156         logger.warn("ReSplitting ranges for BatchGetRequest", e);
157 
158         // retry
159         return doSendBatchGetWithRefetchRegion(backOffer, batch, version);
160       }
161     } else {
162       return doSendBatchGetWithRefetchRegion(backOffer, batch, version);
163     }
164   }
165 
166   private List<KvPair> doSendBatchGetWithRefetchRegion(
167       BackOffer backOffer, Batch batch, long version) {
168     List<Batch> retryBatches =
169         getBatches(backOffer, batch.getKeys(), BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);
170 
171     ArrayList<KvPair> results = new ArrayList<>();
172     for (Batch retryBatch : retryBatches) {
173       // recursive calls
174       List<KvPair> batchResult =
175           doSendBatchGetInBatchesWithRetry(retryBatch.getBackOffer(), retryBatch, version);
176       results.addAll(batchResult);
177     }
178     return results;
179   }
180 
181   private Iterator<KvPair> scanIterator(
182       TiConfiguration conf,
183       RegionStoreClientBuilder builder,
184       ByteString startKey,
185       ByteString endKey,
186       long version) {
187     return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
188   }
189 
190   private Iterator<KvPair> scanIterator(
191       TiConfiguration conf,
192       RegionStoreClientBuilder builder,
193       ByteString startKey,
194       long version,
195       int limit) {
196     return new ConcreteScanIterator(conf, builder, startKey, version, limit);
197   }
198 }