View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.importer;
19  
20  import static org.tikv.common.operation.RegionErrorHandler.NO_LEADER_STORE_ID;
21  
22  import com.google.protobuf.ByteString;
23  import io.grpc.Status;
24  import io.grpc.StatusRuntimeException;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  import org.tikv.common.TiConfiguration;
31  import org.tikv.common.TiSession;
32  import org.tikv.common.apiversion.RequestKeyCodec;
33  import org.tikv.common.exception.GrpcException;
34  import org.tikv.common.exception.RegionException;
35  import org.tikv.common.exception.TiKVException;
36  import org.tikv.common.key.Key;
37  import org.tikv.common.region.TiRegion;
38  import org.tikv.common.region.TiStore;
39  import org.tikv.common.util.BackOffFunction;
40  import org.tikv.common.util.BackOffer;
41  import org.tikv.common.util.ConcreteBackOffer;
42  import org.tikv.common.util.Pair;
43  import org.tikv.kvproto.Errorpb.Error;
44  import org.tikv.kvproto.ImportSstpb;
45  import org.tikv.kvproto.ImportSstpb.RawWriteBatch;
46  import org.tikv.kvproto.Metapb;
47  
48  public class ImporterClient {
49    private static final Logger logger = LoggerFactory.getLogger(ImporterClient.class);
50  
51    private final TiConfiguration tiConf;
52    private final TiSession tiSession;
53    private final ByteString uuid;
54    private final Key minKey;
55    private final Key maxKey;
56    private TiRegion region;
57    private final Long ttl;
58  
59    private boolean deduplicate = false;
60  
61    private boolean streamOpened = false;
62    private ImportSstpb.SSTMeta sstMeta;
63    private List<ImporterStoreClient> clientList;
64    private ImporterStoreClient clientLeader;
65  
66    private final RequestKeyCodec codec;
67  
68    public ImporterClient(
69        TiSession tiSession, ByteString uuid, Key minKey, Key maxKey, TiRegion region, Long ttl) {
70      this.uuid = uuid;
71      this.tiConf = tiSession.getConf();
72      this.tiSession = tiSession;
73      this.minKey = minKey;
74      this.maxKey = maxKey;
75      this.region = region;
76      this.ttl = ttl;
77      this.codec = tiSession.getPDClient().getCodec();
78    }
79  
80    public boolean isDeduplicate() {
81      return deduplicate;
82    }
83  
84    public void setDeduplicate(boolean deduplicate) {
85      this.deduplicate = deduplicate;
86    }
87  
88    /**
89     * write KV pairs to RawKV/Txn using KVStream interface
90     *
91     * @param iterator
92     */
93    public void write(Iterator<Pair<ByteString, ByteString>> iterator) throws TiKVException {
94  
95      streamOpened = false;
96  
97      int maxKVBatchSize = tiConf.getImporterMaxKVBatchSize();
98      int maxKVBatchBytes = tiConf.getImporterMaxKVBatchBytes();
99      int totalBytes = 0;
100     ByteString preKey = null;
101     while (iterator.hasNext()) {
102       ArrayList<ImportSstpb.Pair> pairs = new ArrayList<>(maxKVBatchSize);
103       for (int i = 0; i < maxKVBatchSize; i++) {
104         if (iterator.hasNext()) {
105           Pair<ByteString, ByteString> pair = iterator.next();
106           if (preKey != null && preKey.equals(pair.first)) {
107             if (deduplicate) {
108               logger.info("skip duplicate key: {}", preKey.toStringUtf8());
109             } else {
110               throw new TiKVException(
111                   String.format("duplicate key found, key = %s", preKey.toStringUtf8()));
112             }
113           } else {
114             ByteString key = codec.encodeKey(pair.first);
115             pairs.add(ImportSstpb.Pair.newBuilder().setKey(key).setValue(pair.second).build());
116             totalBytes += (key.size() + pair.second.size());
117             preKey = pair.first;
118           }
119         }
120         if (totalBytes > maxKVBatchBytes || !iterator.hasNext()) {
121           break;
122         }
123       }
124       if (!streamOpened) {
125         init();
126         startWrite();
127         writeMeta();
128         streamOpened = true;
129       }
130       writeBatch(pairs);
131       totalBytes = 0;
132     }
133 
134     if (streamOpened) {
135       finishWrite();
136       ingest();
137     }
138   }
139 
140   private void init() {
141     long regionId = region.getId();
142     Metapb.RegionEpoch regionEpoch = region.getRegionEpoch();
143     Pair<ByteString, ByteString> keyRange =
144         codec.encodePdQueryRange(minKey.toByteString(), maxKey.toByteString());
145 
146     ImportSstpb.Range range =
147         ImportSstpb.Range.newBuilder().setStart(keyRange.first).setEnd(keyRange.second).build();
148 
149     sstMeta =
150         ImportSstpb.SSTMeta.newBuilder()
151             .setApiVersion(tiConf.getApiVersion().toPb())
152             .setUuid(uuid)
153             .setRegionId(regionId)
154             .setRegionEpoch(regionEpoch)
155             .setRange(range)
156             .build();
157 
158     clientList = new ArrayList<>();
159     for (Metapb.Peer peer : region.getPeersList()) {
160       long storeId = peer.getStoreId();
161       TiStore store = tiSession.getRegionManager().getStoreById(storeId);
162       ImporterStoreClient importerStoreClient =
163           tiSession.getImporterRegionStoreClientBuilder().build(store);
164       clientList.add(importerStoreClient);
165 
166       if (region.getLeader().getStoreId() == storeId) {
167         clientLeader = importerStoreClient;
168       }
169     }
170   }
171 
172   private void startWrite() {
173     for (ImporterStoreClient client : clientList) {
174       client.startWrite();
175     }
176   }
177 
178   private void writeMeta() {
179     if (tiConf.isTxnKVMode()) {
180       ImportSstpb.WriteRequest request =
181           ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).build();
182       for (ImporterStoreClient client : clientList) {
183         client.writeBatch(request);
184       }
185     } else {
186       ImportSstpb.RawWriteRequest request =
187           ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build();
188       for (ImporterStoreClient client : clientList) {
189         client.writeBatch(request);
190       }
191     }
192   }
193 
194   private void writeBatch(List<ImportSstpb.Pair> pairs) {
195     if (tiConf.isTxnKVMode()) {
196       ImportSstpb.WriteBatch batch;
197 
198       batch =
199           ImportSstpb.WriteBatch.newBuilder()
200               .addAllPairs(pairs)
201               .setCommitTs(tiSession.getTimestamp().getVersion())
202               .build();
203 
204       ImportSstpb.WriteRequest request =
205           ImportSstpb.WriteRequest.newBuilder().setBatch(batch).build();
206       for (ImporterStoreClient client : clientList) {
207         client.writeBatch(request);
208       }
209     } else {
210       ImportSstpb.RawWriteBatch batch;
211 
212       RawWriteBatch.Builder batchBuilder = RawWriteBatch.newBuilder().addAllPairs(pairs);
213       if (ttl != null && ttl > 0) {
214         batchBuilder.setTtl(ttl);
215       }
216       if (tiConf.getApiVersion().isV2()) {
217         batchBuilder.setTs(tiSession.getTimestamp().getVersion());
218       }
219       batch = batchBuilder.build();
220 
221       ImportSstpb.RawWriteRequest request =
222           ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build();
223       for (ImporterStoreClient client : clientList) {
224         client.writeBatch(request);
225       }
226     }
227   }
228 
229   private void finishWrite() {
230     for (ImporterStoreClient client : clientList) {
231       client.finishWrite();
232     }
233   }
234 
235   private void ingest() throws GrpcException {
236     List<ImporterStoreClient> workingClients = new ArrayList<>(clientList);
237     while (!workingClients.isEmpty()) {
238       Iterator<ImporterStoreClient> itor = workingClients.iterator();
239       while (itor.hasNext()) {
240         ImporterStoreClient client = itor.next();
241         if (client.isWriteResponseReceived()) {
242           itor.remove();
243         } else if (client.hasWriteResponseError()) {
244           throw new GrpcException(client.getWriteError());
245         }
246       }
247 
248       if (!workingClients.isEmpty()) {
249         try {
250           Thread.sleep(1000);
251         } catch (InterruptedException e) {
252           e.printStackTrace();
253         }
254       }
255     }
256 
257     Object writeResponse = clientLeader.getWriteResponse();
258     BackOffer backOffer =
259         ConcreteBackOffer.newCustomBackOff(
260             BackOffer.INGEST_BACKOFF, tiSession.getPDClient().getClusterId());
261     ingestWithRetry(writeResponse, backOffer);
262   }
263 
264   private void ingestWithRetry(Object writeResponse, BackOffer backOffer) {
265     try {
266       clientLeader.multiIngest(region.getLeaderContext(), writeResponse);
267     } catch (RegionException e) {
268       logger.warn("ingest failed.", e);
269       boolean retry = false;
270       Error error = e.getRegionErr();
271       if (error != null) {
272         if (error.hasNotLeader()) {
273           retry = true;
274           long newStoreId = error.getNotLeader().getLeader().getStoreId();
275 
276           // update Leader here
277           logger.warn(
278               String.format(
279                   "NotLeader Error with region id %d and store id %d, new store id %d",
280                   region.getId(), region.getLeader().getStoreId(), newStoreId));
281 
282           BackOffFunction.BackOffFuncType backOffFuncType;
283           if (newStoreId != NO_LEADER_STORE_ID) {
284             long regionId = region.getId();
285             region = tiSession.getRegionManager().updateLeader(region, newStoreId);
286             if (region == null) {
287               // epoch is not changed, getRegionById is faster than getRegionByKey
288               region = tiSession.getRegionManager().getRegionById(regionId);
289             }
290             backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
291           } else {
292             logger.info(
293                 String.format(
294                     "Received zero store id, from region %d try next time", region.getId()));
295             tiSession.getRegionManager().invalidateRegion(region);
296             region = tiSession.getRegionManager().getRegionById(region.getId());
297             backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss;
298           }
299 
300           backOffer.doBackOff(backOffFuncType, e);
301           init();
302         } else if (error.hasServerIsBusy()) {
303           retry = true;
304           // this error is reported from kv:
305           // will occur when write pressure is high. Please try later.
306           logger.warn(
307               String.format(
308                   "Server is busy for region [%s], reason: %s",
309                   region, error.getServerIsBusy().getReason()));
310           backOffer.doBackOff(
311               BackOffFunction.BackOffFuncType.BoServerBusy,
312               new StatusRuntimeException(
313                   Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
314         } else {
315           tiSession.getRegionManager().invalidateRegion(region);
316         }
317       }
318 
319       if (retry) {
320         ingestWithRetry(writeResponse, backOffer);
321       } else {
322         throw e;
323       }
324     }
325   }
326 }