1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.importer;
19
20 import static org.tikv.common.operation.RegionErrorHandler.NO_LEADER_STORE_ID;
21
22 import com.google.protobuf.ByteString;
23 import io.grpc.Status;
24 import io.grpc.StatusRuntimeException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.tikv.common.TiConfiguration;
31 import org.tikv.common.TiSession;
32 import org.tikv.common.apiversion.RequestKeyCodec;
33 import org.tikv.common.exception.GrpcException;
34 import org.tikv.common.exception.RegionException;
35 import org.tikv.common.exception.TiKVException;
36 import org.tikv.common.key.Key;
37 import org.tikv.common.region.TiRegion;
38 import org.tikv.common.region.TiStore;
39 import org.tikv.common.util.BackOffFunction;
40 import org.tikv.common.util.BackOffer;
41 import org.tikv.common.util.ConcreteBackOffer;
42 import org.tikv.common.util.Pair;
43 import org.tikv.kvproto.Errorpb.Error;
44 import org.tikv.kvproto.ImportSstpb;
45 import org.tikv.kvproto.ImportSstpb.RawWriteBatch;
46 import org.tikv.kvproto.Metapb;
47
48 public class ImporterClient {
49 private static final Logger logger = LoggerFactory.getLogger(ImporterClient.class);
50
51 private final TiConfiguration tiConf;
52 private final TiSession tiSession;
53 private final ByteString uuid;
54 private final Key minKey;
55 private final Key maxKey;
56 private TiRegion region;
57 private final Long ttl;
58
59 private boolean deduplicate = false;
60
61 private boolean streamOpened = false;
62 private ImportSstpb.SSTMeta sstMeta;
63 private List<ImporterStoreClient> clientList;
64 private ImporterStoreClient clientLeader;
65
66 private final RequestKeyCodec codec;
67
68 public ImporterClient(
69 TiSession tiSession, ByteString uuid, Key minKey, Key maxKey, TiRegion region, Long ttl) {
70 this.uuid = uuid;
71 this.tiConf = tiSession.getConf();
72 this.tiSession = tiSession;
73 this.minKey = minKey;
74 this.maxKey = maxKey;
75 this.region = region;
76 this.ttl = ttl;
77 this.codec = tiSession.getPDClient().getCodec();
78 }
79
80 public boolean isDeduplicate() {
81 return deduplicate;
82 }
83
84 public void setDeduplicate(boolean deduplicate) {
85 this.deduplicate = deduplicate;
86 }
87
88
89
90
91
92
93 public void write(Iterator<Pair<ByteString, ByteString>> iterator) throws TiKVException {
94
95 streamOpened = false;
96
97 int maxKVBatchSize = tiConf.getImporterMaxKVBatchSize();
98 int maxKVBatchBytes = tiConf.getImporterMaxKVBatchBytes();
99 int totalBytes = 0;
100 ByteString preKey = null;
101 while (iterator.hasNext()) {
102 ArrayList<ImportSstpb.Pair> pairs = new ArrayList<>(maxKVBatchSize);
103 for (int i = 0; i < maxKVBatchSize; i++) {
104 if (iterator.hasNext()) {
105 Pair<ByteString, ByteString> pair = iterator.next();
106 if (preKey != null && preKey.equals(pair.first)) {
107 if (deduplicate) {
108 logger.info("skip duplicate key: {}", preKey.toStringUtf8());
109 } else {
110 throw new TiKVException(
111 String.format("duplicate key found, key = %s", preKey.toStringUtf8()));
112 }
113 } else {
114 ByteString key = codec.encodeKey(pair.first);
115 pairs.add(ImportSstpb.Pair.newBuilder().setKey(key).setValue(pair.second).build());
116 totalBytes += (key.size() + pair.second.size());
117 preKey = pair.first;
118 }
119 }
120 if (totalBytes > maxKVBatchBytes || !iterator.hasNext()) {
121 break;
122 }
123 }
124 if (!streamOpened) {
125 init();
126 startWrite();
127 writeMeta();
128 streamOpened = true;
129 }
130 writeBatch(pairs);
131 totalBytes = 0;
132 }
133
134 if (streamOpened) {
135 finishWrite();
136 ingest();
137 }
138 }
139
140 private void init() {
141 long regionId = region.getId();
142 Metapb.RegionEpoch regionEpoch = region.getRegionEpoch();
143 Pair<ByteString, ByteString> keyRange =
144 codec.encodePdQueryRange(minKey.toByteString(), maxKey.toByteString());
145
146 ImportSstpb.Range range =
147 ImportSstpb.Range.newBuilder().setStart(keyRange.first).setEnd(keyRange.second).build();
148
149 sstMeta =
150 ImportSstpb.SSTMeta.newBuilder()
151 .setApiVersion(tiConf.getApiVersion().toPb())
152 .setUuid(uuid)
153 .setRegionId(regionId)
154 .setRegionEpoch(regionEpoch)
155 .setRange(range)
156 .build();
157
158 clientList = new ArrayList<>();
159 for (Metapb.Peer peer : region.getPeersList()) {
160 long storeId = peer.getStoreId();
161 TiStore store = tiSession.getRegionManager().getStoreById(storeId);
162 ImporterStoreClient importerStoreClient =
163 tiSession.getImporterRegionStoreClientBuilder().build(store);
164 clientList.add(importerStoreClient);
165
166 if (region.getLeader().getStoreId() == storeId) {
167 clientLeader = importerStoreClient;
168 }
169 }
170 }
171
172 private void startWrite() {
173 for (ImporterStoreClient client : clientList) {
174 client.startWrite();
175 }
176 }
177
178 private void writeMeta() {
179 if (tiConf.isTxnKVMode()) {
180 ImportSstpb.WriteRequest request =
181 ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).build();
182 for (ImporterStoreClient client : clientList) {
183 client.writeBatch(request);
184 }
185 } else {
186 ImportSstpb.RawWriteRequest request =
187 ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build();
188 for (ImporterStoreClient client : clientList) {
189 client.writeBatch(request);
190 }
191 }
192 }
193
194 private void writeBatch(List<ImportSstpb.Pair> pairs) {
195 if (tiConf.isTxnKVMode()) {
196 ImportSstpb.WriteBatch batch;
197
198 batch =
199 ImportSstpb.WriteBatch.newBuilder()
200 .addAllPairs(pairs)
201 .setCommitTs(tiSession.getTimestamp().getVersion())
202 .build();
203
204 ImportSstpb.WriteRequest request =
205 ImportSstpb.WriteRequest.newBuilder().setBatch(batch).build();
206 for (ImporterStoreClient client : clientList) {
207 client.writeBatch(request);
208 }
209 } else {
210 ImportSstpb.RawWriteBatch batch;
211
212 RawWriteBatch.Builder batchBuilder = RawWriteBatch.newBuilder().addAllPairs(pairs);
213 if (ttl != null && ttl > 0) {
214 batchBuilder.setTtl(ttl);
215 }
216 if (tiConf.getApiVersion().isV2()) {
217 batchBuilder.setTs(tiSession.getTimestamp().getVersion());
218 }
219 batch = batchBuilder.build();
220
221 ImportSstpb.RawWriteRequest request =
222 ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build();
223 for (ImporterStoreClient client : clientList) {
224 client.writeBatch(request);
225 }
226 }
227 }
228
229 private void finishWrite() {
230 for (ImporterStoreClient client : clientList) {
231 client.finishWrite();
232 }
233 }
234
235 private void ingest() throws GrpcException {
236 List<ImporterStoreClient> workingClients = new ArrayList<>(clientList);
237 while (!workingClients.isEmpty()) {
238 Iterator<ImporterStoreClient> itor = workingClients.iterator();
239 while (itor.hasNext()) {
240 ImporterStoreClient client = itor.next();
241 if (client.isWriteResponseReceived()) {
242 itor.remove();
243 } else if (client.hasWriteResponseError()) {
244 throw new GrpcException(client.getWriteError());
245 }
246 }
247
248 if (!workingClients.isEmpty()) {
249 try {
250 Thread.sleep(1000);
251 } catch (InterruptedException e) {
252 e.printStackTrace();
253 }
254 }
255 }
256
257 Object writeResponse = clientLeader.getWriteResponse();
258 BackOffer backOffer =
259 ConcreteBackOffer.newCustomBackOff(
260 BackOffer.INGEST_BACKOFF, tiSession.getPDClient().getClusterId());
261 ingestWithRetry(writeResponse, backOffer);
262 }
263
264 private void ingestWithRetry(Object writeResponse, BackOffer backOffer) {
265 try {
266 clientLeader.multiIngest(region.getLeaderContext(), writeResponse);
267 } catch (RegionException e) {
268 logger.warn("ingest failed.", e);
269 boolean retry = false;
270 Error error = e.getRegionErr();
271 if (error != null) {
272 if (error.hasNotLeader()) {
273 retry = true;
274 long newStoreId = error.getNotLeader().getLeader().getStoreId();
275
276
277 logger.warn(
278 String.format(
279 "NotLeader Error with region id %d and store id %d, new store id %d",
280 region.getId(), region.getLeader().getStoreId(), newStoreId));
281
282 BackOffFunction.BackOffFuncType backOffFuncType;
283 if (newStoreId != NO_LEADER_STORE_ID) {
284 long regionId = region.getId();
285 region = tiSession.getRegionManager().updateLeader(region, newStoreId);
286 if (region == null) {
287
288 region = tiSession.getRegionManager().getRegionById(regionId);
289 }
290 backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
291 } else {
292 logger.info(
293 String.format(
294 "Received zero store id, from region %d try next time", region.getId()));
295 tiSession.getRegionManager().invalidateRegion(region);
296 region = tiSession.getRegionManager().getRegionById(region.getId());
297 backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss;
298 }
299
300 backOffer.doBackOff(backOffFuncType, e);
301 init();
302 } else if (error.hasServerIsBusy()) {
303 retry = true;
304
305
306 logger.warn(
307 String.format(
308 "Server is busy for region [%s], reason: %s",
309 region, error.getServerIsBusy().getReason()));
310 backOffer.doBackOff(
311 BackOffFunction.BackOffFuncType.BoServerBusy,
312 new StatusRuntimeException(
313 Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
314 } else {
315 tiSession.getRegionManager().invalidateRegion(region);
316 }
317 }
318
319 if (retry) {
320 ingestWithRetry(writeResponse, backOffer);
321 } else {
322 throw e;
323 }
324 }
325 }
326 }