1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common;
19
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static org.tikv.common.operation.PDErrorHandler.getRegionResponseErrorExtractor;
22 import static org.tikv.common.pd.PDError.buildFromPdpbError;
23 import static org.tikv.common.pd.PDUtils.addrToUri;
24 import static org.tikv.common.pd.PDUtils.uriToAddr;
25
26 import com.fasterxml.jackson.core.type.TypeReference;
27 import com.fasterxml.jackson.databind.ObjectMapper;
28 import com.fasterxml.jackson.databind.json.JsonMapper;
29 import com.google.common.annotations.VisibleForTesting;
30 import com.google.common.util.concurrent.ThreadFactoryBuilder;
31 import com.google.protobuf.ByteString;
32 import io.etcd.jetcd.ByteSequence;
33 import io.etcd.jetcd.Client;
34 import io.etcd.jetcd.KeyValue;
35 import io.etcd.jetcd.kv.GetResponse;
36 import io.etcd.jetcd.options.GetOption;
37 import io.grpc.ManagedChannel;
38 import io.grpc.Metadata;
39 import io.grpc.stub.MetadataUtils;
40 import io.prometheus.client.Histogram;
41 import java.net.URI;
42 import java.net.URL;
43 import java.nio.charset.StandardCharsets;
44 import java.util.ArrayList;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
48 import java.util.Optional;
49 import java.util.concurrent.CompletableFuture;
50 import java.util.concurrent.ConcurrentHashMap;
51 import java.util.concurrent.ConcurrentMap;
52 import java.util.concurrent.ExecutionException;
53 import java.util.concurrent.ExecutorService;
54 import java.util.concurrent.Executors;
55 import java.util.concurrent.RejectedExecutionException;
56 import java.util.concurrent.ScheduledExecutorService;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.atomic.AtomicBoolean;
59 import java.util.function.Supplier;
60 import java.util.stream.Collectors;
61 import org.apache.http.client.methods.CloseableHttpResponse;
62 import org.apache.http.client.methods.HttpPost;
63 import org.apache.http.entity.ByteArrayEntity;
64 import org.apache.http.impl.client.CloseableHttpClient;
65 import org.apache.http.impl.client.HttpClients;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68 import org.tikv.common.apiversion.RequestKeyCodec;
69 import org.tikv.common.codec.KeyUtils;
70 import org.tikv.common.exception.GrpcException;
71 import org.tikv.common.exception.TiClientInternalException;
72 import org.tikv.common.meta.TiTimestamp;
73 import org.tikv.common.operation.NoopHandler;
74 import org.tikv.common.operation.PDErrorHandler;
75 import org.tikv.common.util.BackOffFunction.BackOffFuncType;
76 import org.tikv.common.util.BackOffer;
77 import org.tikv.common.util.ChannelFactory;
78 import org.tikv.common.util.ConcreteBackOffer;
79 import org.tikv.common.util.HistogramUtils;
80 import org.tikv.common.util.Pair;
81 import org.tikv.kvproto.Metapb;
82 import org.tikv.kvproto.Metapb.Store;
83 import org.tikv.kvproto.PDGrpc;
84 import org.tikv.kvproto.PDGrpc.PDBlockingStub;
85 import org.tikv.kvproto.PDGrpc.PDFutureStub;
86 import org.tikv.kvproto.Pdpb;
87 import org.tikv.kvproto.Pdpb.Error;
88 import org.tikv.kvproto.Pdpb.ErrorType;
89 import org.tikv.kvproto.Pdpb.GetAllStoresRequest;
90 import org.tikv.kvproto.Pdpb.GetMembersRequest;
91 import org.tikv.kvproto.Pdpb.GetMembersResponse;
92 import org.tikv.kvproto.Pdpb.GetOperatorRequest;
93 import org.tikv.kvproto.Pdpb.GetOperatorResponse;
94 import org.tikv.kvproto.Pdpb.GetRegionByIDRequest;
95 import org.tikv.kvproto.Pdpb.GetRegionRequest;
96 import org.tikv.kvproto.Pdpb.GetRegionResponse;
97 import org.tikv.kvproto.Pdpb.GetStoreRequest;
98 import org.tikv.kvproto.Pdpb.GetStoreResponse;
99 import org.tikv.kvproto.Pdpb.OperatorStatus;
100 import org.tikv.kvproto.Pdpb.RequestHeader;
101 import org.tikv.kvproto.Pdpb.ResponseHeader;
102 import org.tikv.kvproto.Pdpb.ScatterRegionRequest;
103 import org.tikv.kvproto.Pdpb.ScatterRegionResponse;
104 import org.tikv.kvproto.Pdpb.Timestamp;
105 import org.tikv.kvproto.Pdpb.TsoRequest;
106 import org.tikv.kvproto.Pdpb.TsoResponse;
107 import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest;
108
109 public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
110 implements ReadOnlyPDClient {
111 private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
112 private static final long MIN_TRY_UPDATE_DURATION = 50;
113 private static final int PAUSE_CHECKER_TIMEOUT = 300;
114 private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5;
115 private static final Logger logger = LoggerFactory.getLogger(PDClient.class);
116
117 private final RequestKeyCodec codec;
118 private RequestHeader header;
119 private TsoRequest tsoReq;
120 private volatile PDClientWrapper pdClientWrapper;
121 private ScheduledExecutorService service;
122 private ScheduledExecutorService tiflashReplicaService;
123 private final HashMap<PDChecker, ScheduledExecutorService> pauseCheckerService = new HashMap<>();
124 private List<URI> pdAddrs;
125 private Client etcdClient;
126 private ConcurrentMap<Long, Double> tiflashReplicaMap;
127 private HostMapping hostMapping;
128 private long lastUpdateLeaderTime;
129 private final ExecutorService updateLeaderService = Executors.newSingleThreadExecutor();
130 private final AtomicBoolean updateLeaderNotify = new AtomicBoolean();
131
132 public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY =
133 HistogramUtils.buildDuration()
134 .name("client_java_pd_get_region_by_requests_latency")
135 .help("pd getRegionByKey request latency.")
136 .labelNames("cluster")
137 .register();
138
139 private PDClient(TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
140 super(conf, channelFactory);
141 initCluster();
142 this.codec = codec;
143 this.blockingStub = getBlockingStub();
144 this.asyncStub = getAsyncStub();
145 }
146
147 public static ReadOnlyPDClient create(
148 TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
149 return createRaw(conf, codec, channelFactory);
150 }
151
152 static PDClient createRaw(
153 TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
154 return new PDClient(conf, codec, channelFactory);
155 }
156
157 public HostMapping getHostMapping() {
158 return hostMapping;
159 }
160
161 @Override
162 public TiTimestamp getTimestamp(BackOffer backOffer) {
163 Supplier<TsoRequest> request = () -> tsoReq;
164
165 PDErrorHandler<TsoResponse> handler =
166 new PDErrorHandler<>(
167 r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
168 this);
169
170 TsoResponse resp = callWithRetry(backOffer, PDGrpc.getTsoMethod(), request, handler);
171 Timestamp timestamp = resp.getTimestamp();
172 return new TiTimestamp(timestamp.getPhysical(), timestamp.getLogical());
173 }
174
175 public synchronized void keepPauseChecker(PDChecker checker) {
176 if (!this.pauseCheckerService.containsKey(checker)) {
177 ScheduledExecutorService newService =
178 Executors.newSingleThreadScheduledExecutor(
179 new ThreadFactoryBuilder()
180 .setNameFormat(String.format("PDClient-pause-%s-pool-%%d", checker.name()))
181 .setDaemon(true)
182 .build());
183 newService.scheduleAtFixedRate(
184 () -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT),
185 0,
186 KEEP_CHECKER_PAUSE_PERIOD,
187 TimeUnit.SECONDS);
188 this.pauseCheckerService.put(checker, newService);
189 }
190 }
191
192 public synchronized void stopKeepPauseChecker(PDChecker checker) {
193 if (this.pauseCheckerService.containsKey(checker)) {
194 this.pauseCheckerService.get(checker).shutdown();
195 this.pauseCheckerService.remove(checker);
196 }
197 }
198
199 public void resumeChecker(PDChecker checker) {
200 pauseChecker(checker, 0);
201 }
202
203 private void pauseChecker(PDChecker checker, int timeout) {
204 String verb = timeout == 0 ? "resume" : "pause";
205 URI url = pdAddrs.get(0);
206 String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
207 HashMap<String, Integer> arguments = new HashMap<>();
208 arguments.put("delay", timeout);
209 try (CloseableHttpClient client = HttpClients.createDefault()) {
210 JsonMapper jsonMapper = new JsonMapper();
211 byte[] body = jsonMapper.writeValueAsBytes(arguments);
212 HttpPost post = new HttpPost(api);
213 post.setEntity(new ByteArrayEntity(body));
214 try (CloseableHttpResponse resp = client.execute(post)) {
215 if (resp.getStatusLine().getStatusCode() != 200) {
216 logger.error("failed to {} checker.", verb);
217 }
218 logger.info("checker {} {}d", checker.apiName(), verb);
219 }
220 } catch (Exception e) {
221 logger.error(String.format("failed to %s checker.", verb), e);
222 }
223 }
224
225 public Boolean isCheckerPaused(PDChecker checker) {
226 URI url = pdAddrs.get(0);
227 String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
228 try {
229 ObjectMapper mapper = new ObjectMapper();
230 HashMap<String, Boolean> status =
231 mapper.readValue(new URL(api), new TypeReference<HashMap<String, Boolean>>() {});
232 return status.get("paused");
233 } catch (Exception e) {
234 logger.error(String.format("failed to get %s checker status.", checker.apiName()), e);
235 return null;
236 }
237 }
238
239
240
241
242
243
244 void scatterRegion(Metapb.Region region, BackOffer backOffer) {
245 Supplier<ScatterRegionRequest> request =
246 () ->
247 ScatterRegionRequest.newBuilder().setHeader(header).setRegionId(region.getId()).build();
248
249 PDErrorHandler<ScatterRegionResponse> handler =
250 new PDErrorHandler<>(
251 r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
252 this);
253
254 ScatterRegionResponse resp =
255 callWithRetry(backOffer, PDGrpc.getScatterRegionMethod(), request, handler);
256
257 if (resp.hasHeader() && resp.getHeader().hasError()) {
258 throw new TiClientInternalException(
259 String.format("failed to scatter region because %s", resp.getHeader().getError()));
260 }
261 }
262
263
264
265
266
267
268 void waitScatterRegionFinish(Metapb.Region region, BackOffer backOffer) {
269 for (; ; ) {
270 GetOperatorResponse resp = getOperator(region.getId());
271 if (resp != null) {
272 if (isScatterRegionFinish(resp)) {
273 logger.info(String.format("wait scatter region on %d is finished", region.getId()));
274 return;
275 } else {
276 backOffer.doBackOff(
277 BackOffFuncType.BoRegionMiss, new GrpcException("waiting scatter region"));
278 logger.info(
279 String.format(
280 "wait scatter region %d at key %s is %s",
281 region.getId(),
282 KeyUtils.formatBytes(resp.getDesc().toByteArray()),
283 resp.getStatus()));
284 }
285 }
286 }
287 }
288
289 private GetOperatorResponse getOperator(long regionId) {
290 Supplier<GetOperatorRequest> request =
291 () -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build();
292
293 return callWithRetry(
294 ConcreteBackOffer.newCustomBackOff(0, getClusterId()),
295 PDGrpc.getGetOperatorMethod(),
296 request,
297 new NoopHandler<>());
298 }
299
300 private boolean isScatterRegionFinish(GetOperatorResponse resp) {
301
302
303 boolean finished =
304 !resp.getDesc().equals(ByteString.copyFromUtf8("scatter-region"))
305 || resp.getStatus() != OperatorStatus.RUNNING;
306
307 if (resp.hasHeader()) {
308 ResponseHeader header = resp.getHeader();
309 if (header.hasError()) {
310 Error error = header.getError();
311
312 if (error.getType() == ErrorType.REGION_NOT_FOUND) {
313 finished = true;
314 }
315 }
316 }
317 return finished;
318 }
319
320 @Override
321 public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
322 Histogram.Timer requestTimer =
323 PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer();
324 try {
325 Supplier<GetRegionRequest> request =
326 () ->
327 GetRegionRequest.newBuilder()
328 .setHeader(header)
329 .setRegionKey(codec.encodePdQuery(key))
330 .build();
331
332 PDErrorHandler<GetRegionResponse> handler =
333 new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
334
335 GetRegionResponse resp =
336 callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
337 return new Pair<>(codec.decodeRegion(resp.getRegion()), resp.getLeader());
338 } finally {
339 requestTimer.observeDuration();
340 }
341 }
342
343 @Override
344 public Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id) {
345 Supplier<GetRegionByIDRequest> request =
346 () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
347 PDErrorHandler<GetRegionResponse> handler =
348 new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
349
350 GetRegionResponse resp =
351 callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler);
352 return new Pair<Metapb.Region, Metapb.Peer>(
353 codec.decodeRegion(resp.getRegion()), resp.getLeader());
354 }
355
356 @Override
357 public List<Pdpb.Region> scanRegions(
358 BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
359
360
361 PDGrpc.PDBlockingStub stub =
362 getBlockingStub().withDeadlineAfter(conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS);
363 Pair<ByteString, ByteString> range = codec.encodePdQueryRange(startKey, endKey);
364 Pdpb.ScanRegionsRequest request =
365 Pdpb.ScanRegionsRequest.newBuilder()
366 .setHeader(header)
367 .setStartKey(range.first)
368 .setEndKey(range.second)
369 .setLimit(limit)
370 .build();
371 Pdpb.ScanRegionsResponse resp = stub.scanRegions(request);
372 if (resp == null) {
373 return null;
374 }
375
376 return codec.decodePdRegions(resp.getRegionsList());
377 }
378
379 private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) {
380 return () -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build();
381 }
382
383 private Supplier<GetAllStoresRequest> buildGetAllStoresReq() {
384 return () -> GetAllStoresRequest.newBuilder().setHeader(header).build();
385 }
386
387 private Supplier<UpdateServiceGCSafePointRequest> buildUpdateServiceGCSafePointRequest(
388 ByteString serviceId, long ttl, long safePoint) {
389 return () ->
390 UpdateServiceGCSafePointRequest.newBuilder()
391 .setHeader(header)
392 .setSafePoint(safePoint)
393 .setServiceId(serviceId)
394 .setTTL(ttl)
395 .build();
396 }
397
398 private <T> PDErrorHandler<GetStoreResponse> buildPDErrorHandler() {
399 return new PDErrorHandler<>(
400 r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this);
401 }
402
403 @Override
404 public Store getStore(BackOffer backOffer, long storeId) {
405 GetStoreResponse resp =
406 callWithRetry(
407 backOffer,
408 PDGrpc.getGetStoreMethod(),
409 buildGetStoreReq(storeId),
410 buildPDErrorHandler());
411 if (resp != null) {
412 return resp.getStore();
413 }
414 return null;
415 }
416
417 @Override
418 public List<Store> getAllStores(BackOffer backOffer) {
419 return callWithRetry(
420 backOffer,
421 PDGrpc.getGetAllStoresMethod(),
422 buildGetAllStoresReq(),
423 new PDErrorHandler<>(
424 r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
425 this))
426 .getStoresList();
427 }
428
429 @Override
430 public TiConfiguration.ReplicaRead getReplicaRead() {
431 return conf.getReplicaRead();
432 }
433
434 @Override
435 public Long updateServiceGCSafePoint(
436 String serviceId, long ttl, long safePoint, BackOffer backOffer) {
437 return callWithRetry(
438 backOffer,
439 PDGrpc.getUpdateServiceGCSafePointMethod(),
440 buildUpdateServiceGCSafePointRequest(
441 ByteString.copyFromUtf8(serviceId), ttl, safePoint),
442 new PDErrorHandler<>(
443 r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
444 this))
445 .getMinSafePoint();
446 }
447
448 @Override
449 public void close() throws InterruptedException {
450 etcdClient.close();
451 if (service != null) {
452 service.shutdownNow();
453 }
454 if (tiflashReplicaService != null) {
455 tiflashReplicaService.shutdownNow();
456 }
457 if (channelFactory != null) {
458 channelFactory.close();
459 }
460
461 updateLeaderService.shutdownNow();
462 }
463
464 @VisibleForTesting
465 RequestHeader getHeader() {
466 return header;
467 }
468
469 @VisibleForTesting
470 PDClientWrapper getPdClientWrapper() {
471 return pdClientWrapper;
472 }
473
474 private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
475 while (true) {
476 backOffer.checkTimeout();
477
478 try {
479 ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping);
480 PDGrpc.PDBlockingStub stub =
481 PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
482 GetMembersRequest request =
483 GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
484 GetMembersResponse resp = stub.getMembers(request);
485
486 if (resp != null && resp.getLeader().getMemberId() == 0) {
487 return null;
488 }
489 return resp;
490 } catch (Exception e) {
491 logger.warn(
492 "failed to get member from pd server from {}, caused by: {}", uri, e.getMessage());
493 backOffer.doBackOff(BackOffFuncType.BoPDRPC, e);
494 }
495 }
496 }
497
498 private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
499 try {
500 return doGetMembers(backOffer, uri);
501 } catch (Exception e) {
502 return null;
503 }
504 }
505
506
507 synchronized boolean trySwitchLeader(String leaderUrlStr) {
508 if (pdClientWrapper != null) {
509 if (leaderUrlStr.equals(pdClientWrapper.getLeaderInfo())) {
510
511 if (leaderUrlStr.equals(pdClientWrapper.getStoreAddress())) {
512 return true;
513 }
514 }
515
516 }
517
518 return createLeaderClientWrapper(leaderUrlStr);
519 }
520
521 private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) {
522 try {
523
524 ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping);
525 pdClientWrapper =
526 new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime());
527 timeout = conf.getTimeout();
528 } catch (IllegalArgumentException e) {
529 return false;
530 }
531 logger.info(String.format("Switched to new leader: %s", pdClientWrapper));
532 return true;
533 }
534
535 synchronized boolean createFollowerClientWrapper(
536 BackOffer backOffer, String followerUrlStr, String leaderUrls) {
537
538
539 try {
540 if (!checkHealth(backOffer, followerUrlStr, hostMapping)) {
541 return false;
542 }
543
544
545 ManagedChannel channel = channelFactory.getChannel(followerUrlStr, hostMapping);
546 pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime());
547 timeout = conf.getForwardTimeout();
548 } catch (IllegalArgumentException e) {
549 return false;
550 }
551 logger.info(String.format("Switched to new leader by follower forward: %s", pdClientWrapper));
552 return true;
553 }
554
555 public void tryUpdateLeaderOrForwardFollower() {
556 if (updateLeaderNotify.compareAndSet(false, true)) {
557 try {
558 updateLeaderService.submit(
559 () -> {
560 try {
561 updateLeaderOrForwardFollower();
562 } catch (Exception e) {
563 logger.info("update leader or forward follower failed", e);
564 throw e;
565 } finally {
566 updateLeaderNotify.set(false);
567 logger.info("updating leader finish");
568 }
569 });
570 } catch (RejectedExecutionException e) {
571 logger.error("PDClient is shutdown", e);
572 updateLeaderNotify.set(false);
573 }
574 }
575 }
576
577 private synchronized void updateLeaderOrForwardFollower() {
578 logger.warn("updating leader or forward follower");
579 if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
580 return;
581 }
582 for (URI url : this.pdAddrs) {
583 BackOffer backOffer = this.probeBackOffer();
584
585 GetMembersResponse resp = getMembers(backOffer, url);
586 if (resp == null) {
587 continue;
588 }
589 if (resp.getLeader().getClientUrlsList().isEmpty()) {
590 continue;
591 }
592
593 String leaderUrlStr = resp.getLeader().getClientUrlsList().get(0);
594 leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
595
596
597 if (checkHealth(backOffer, leaderUrlStr, hostMapping)
598 && createLeaderClientWrapper(leaderUrlStr)) {
599 lastUpdateLeaderTime = System.currentTimeMillis();
600 return;
601 }
602
603 if (!conf.getEnableGrpcForward()) {
604 continue;
605 }
606
607 logger.info(String.format("can not switch to new leader, try follower forward"));
608 List<Pdpb.Member> members = resp.getMembersList();
609
610
611 boolean hasReachNextMember =
612 pdClientWrapper != null && pdClientWrapper.getStoreAddress().equals(leaderUrlStr);
613
614 for (int i = 0; i < members.size() * 2; i++) {
615 Pdpb.Member member = members.get(i % members.size());
616 if (member.getMemberId() == resp.getLeader().getMemberId()) {
617 continue;
618 }
619 String followerUrlStr = member.getClientUrlsList().get(0);
620 followerUrlStr = uriToAddr(addrToUri(followerUrlStr));
621 if (pdClientWrapper != null && pdClientWrapper.getStoreAddress().equals(followerUrlStr)) {
622 hasReachNextMember = true;
623 continue;
624 }
625 if (hasReachNextMember
626 && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) {
627 logger.warn(
628 String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr));
629 return;
630 }
631 }
632 }
633 lastUpdateLeaderTime = System.currentTimeMillis();
634 if (pdClientWrapper == null) {
635 throw new TiClientInternalException(
636 "already tried all address on file, but not leader found yet.");
637 }
638 }
639
640 public void tryUpdateLeader() {
641 logger.info("try update leader");
642 for (URI url : this.pdAddrs) {
643 BackOffer backOffer = this.probeBackOffer();
644
645 GetMembersResponse resp = getMembers(backOffer, url);
646 if (resp == null) {
647 continue;
648 }
649 List<URI> urls =
650 resp.getMembersList()
651 .stream()
652 .map(mem -> addrToUri(mem.getClientUrls(0)))
653 .collect(Collectors.toList());
654 String leaderUrlStr = resp.getLeader().getClientUrlsList().get(0);
655 leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
656
657
658 if (checkHealth(backOffer, leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) {
659 if (!urls.equals(this.pdAddrs)) {
660 tryUpdateMembers(urls);
661 }
662 return;
663 }
664 }
665 lastUpdateLeaderTime = System.currentTimeMillis();
666 if (pdClientWrapper == null) {
667 throw new TiClientInternalException(
668 "already tried all address on file, but not leader found yet.");
669 }
670 }
671
672 private synchronized void tryUpdateMembers(List<URI> members) {
673 this.pdAddrs = members;
674 }
675
676 public void updateTiFlashReplicaStatus() {
677 ByteSequence prefix =
678 ByteSequence.from(TIFLASH_TABLE_SYNC_PROGRESS_PATH, StandardCharsets.UTF_8);
679 for (int i = 0; i < 5; i++) {
680 CompletableFuture<GetResponse> resp;
681 try {
682 resp =
683 etcdClient.getKVClient().get(prefix, GetOption.newBuilder().withPrefix(prefix).build());
684 } catch (Exception e) {
685 logger.info("get tiflash table replica sync progress failed, continue checking.", e);
686 continue;
687 }
688 GetResponse getResp;
689 try {
690 getResp = resp.get();
691 } catch (InterruptedException e) {
692 Thread.currentThread().interrupt();
693 continue;
694 } catch (ExecutionException e) {
695 throw new GrpcException("failed to update tiflash replica", e);
696 }
697 ConcurrentMap<Long, Double> progressMap = new ConcurrentHashMap<>();
698 for (KeyValue kv : getResp.getKvs()) {
699 long tableId;
700 try {
701 tableId =
702 Long.parseLong(
703 kv.getKey().toString().substring(TIFLASH_TABLE_SYNC_PROGRESS_PATH.length()));
704 } catch (Exception e) {
705 logger.info(
706 "invalid tiflash table replica sync progress key. key = " + kv.getKey().toString());
707 continue;
708 }
709 double progress;
710 try {
711 progress = Double.parseDouble(kv.getValue().toString());
712 } catch (Exception e) {
713 logger.info(
714 "invalid tiflash table replica sync progress value. value = "
715 + kv.getValue().toString());
716 continue;
717 }
718 progressMap.put(tableId, progress);
719 }
720 tiflashReplicaMap = progressMap;
721 break;
722 }
723 }
724
725 public double getTiFlashReplicaProgress(long tableId) {
726 return tiflashReplicaMap.getOrDefault(tableId, 0.0);
727 }
728
729 @Override
730 protected PDBlockingStub getBlockingStub() {
731 if (pdClientWrapper == null) {
732 throw new GrpcException("PDClient may not be initialized");
733 }
734 return pdClientWrapper.getBlockingStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
735 }
736
737 @Override
738 protected PDFutureStub getAsyncStub() {
739 if (pdClientWrapper == null) {
740 throw new GrpcException("PDClient may not be initialized");
741 }
742 return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
743 }
744
745 private void initCluster() {
746 logger.info("init cluster: start");
747 GetMembersResponse resp = null;
748 List<URI> pdAddrs = new ArrayList<>(getConf().getPdAddrs());
749
750 Collections.shuffle(pdAddrs);
751 this.pdAddrs = pdAddrs;
752 this.etcdClient =
753 Client.builder()
754 .endpoints(pdAddrs)
755 .executorService(
756 Executors.newCachedThreadPool(
757 new ThreadFactoryBuilder()
758 .setNameFormat("etcd-conn-manager-pool-%d")
759 .setDaemon(true)
760 .build()))
761 .build();
762 logger.info("init host mapping: start");
763 this.hostMapping =
764 Optional.ofNullable(getConf().getHostMapping())
765 .orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName()));
766 logger.info("init host mapping: end");
767
768 long originTimeout = this.timeout;
769 this.timeout = conf.getPdFirstGetMemberTimeout();
770 for (URI u : pdAddrs) {
771 logger.info("get members with pd " + u + ": start");
772 resp = getMembers(defaultBackOffer(), u);
773 logger.info("get members with pd " + u + ": end");
774 if (resp != null) {
775 break;
776 }
777 }
778 if (resp == null) {
779 logger.error("Could not get leader member with: " + pdAddrs);
780 }
781
782 this.timeout = originTimeout;
783 checkNotNull(resp, "Failed to init client for PD cluster.");
784 long clusterId = resp.getHeader().getClusterId();
785 header = RequestHeader.newBuilder().setClusterId(clusterId).build();
786 tsoReq = TsoRequest.newBuilder().setHeader(header).setCount(1).build();
787 this.tiflashReplicaMap = new ConcurrentHashMap<>();
788 this.pdAddrs =
789 resp.getMembersList()
790 .stream()
791 .map(mem -> addrToUri(mem.getClientUrls(0)))
792 .collect(Collectors.toList());
793 logger.info("init cluster with address: " + this.pdAddrs);
794
795 String leaderUrlStr = resp.getLeader().getClientUrls(0);
796 leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
797 logger.info("createLeaderClientWrapper with leader " + leaderUrlStr + ": start");
798 createLeaderClientWrapper(leaderUrlStr);
799 logger.info("createLeaderClientWrapper with leader " + leaderUrlStr + ": end");
800 service =
801 Executors.newSingleThreadScheduledExecutor(
802 new ThreadFactoryBuilder()
803 .setNameFormat("PDClient-update-leader-pool-%d")
804 .setDaemon(true)
805 .build());
806 service.scheduleAtFixedRate(
807 () -> {
808
809 try {
810 tryUpdateLeader();
811 } catch (Exception e) {
812 logger.warn("Update leader failed", e);
813 }
814 },
815 10,
816 10,
817 TimeUnit.SECONDS);
818 if (conf.isTiFlashEnabled()) {
819 tiflashReplicaService =
820 Executors.newSingleThreadScheduledExecutor(
821 new ThreadFactoryBuilder()
822 .setNameFormat("PDClient-tiflash-replica-pool-%d")
823 .setDaemon(true)
824 .build());
825 tiflashReplicaService.scheduleAtFixedRate(
826 this::updateTiFlashReplicaStatus, 10, 10, TimeUnit.SECONDS);
827 }
828 logger.info("init cluster: finish");
829 }
830
831 static class PDClientWrapper {
832 private final String leaderInfo;
833 private final PDBlockingStub blockingStub;
834 private final PDFutureStub asyncStub;
835 private final long createTime;
836 private final String storeAddress;
837
838 PDClientWrapper(
839 String leaderInfo, String storeAddress, ManagedChannel clientChannel, long createTime) {
840 if (!storeAddress.equals(leaderInfo)) {
841 Metadata header = new Metadata();
842 header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString());
843 this.blockingStub =
844 PDGrpc.newBlockingStub(clientChannel)
845 .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
846 this.asyncStub =
847 PDGrpc.newFutureStub(clientChannel)
848 .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
849 } else {
850 this.blockingStub = PDGrpc.newBlockingStub(clientChannel);
851 this.asyncStub = PDGrpc.newFutureStub(clientChannel);
852 }
853 this.leaderInfo = leaderInfo;
854 this.storeAddress = storeAddress;
855 this.createTime = createTime;
856 }
857
858 String getLeaderInfo() {
859 return leaderInfo;
860 }
861
862 String getStoreAddress() {
863 return storeAddress;
864 }
865
866 PDBlockingStub getBlockingStub() {
867 return blockingStub;
868 }
869
870 PDFutureStub getAsyncStub() {
871 return asyncStub;
872 }
873
874 long getCreateTime() {
875 return createTime;
876 }
877
878 @Override
879 public String toString() {
880 return "[leaderInfo: " + leaderInfo + ", storeAddress: " + storeAddress + "]";
881 }
882 }
883
884 public Long getClusterId() {
885 return header.getClusterId();
886 }
887
888 public List<URI> getPdAddrs() {
889 return pdAddrs;
890 }
891
892 public RequestKeyCodec getCodec() {
893 return codec;
894 }
895
896 private static BackOffer defaultBackOffer() {
897 return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
898 }
899
900 private BackOffer probeBackOffer() {
901 int maxSleep = (int) getTimeout() * 2;
902 return ConcreteBackOffer.newCustomBackOff(maxSleep);
903 }
904 }