View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common;
19  
20  import static com.google.common.base.Preconditions.checkNotNull;
21  import static org.tikv.common.operation.PDErrorHandler.getRegionResponseErrorExtractor;
22  import static org.tikv.common.pd.PDError.buildFromPdpbError;
23  import static org.tikv.common.pd.PDUtils.addrToUri;
24  import static org.tikv.common.pd.PDUtils.uriToAddr;
25  
26  import com.fasterxml.jackson.core.type.TypeReference;
27  import com.fasterxml.jackson.databind.ObjectMapper;
28  import com.fasterxml.jackson.databind.json.JsonMapper;
29  import com.google.common.annotations.VisibleForTesting;
30  import com.google.common.util.concurrent.ThreadFactoryBuilder;
31  import com.google.protobuf.ByteString;
32  import io.etcd.jetcd.ByteSequence;
33  import io.etcd.jetcd.Client;
34  import io.etcd.jetcd.KeyValue;
35  import io.etcd.jetcd.kv.GetResponse;
36  import io.etcd.jetcd.options.GetOption;
37  import io.grpc.ManagedChannel;
38  import io.grpc.Metadata;
39  import io.grpc.stub.MetadataUtils;
40  import io.prometheus.client.Histogram;
41  import java.net.URI;
42  import java.net.URL;
43  import java.nio.charset.StandardCharsets;
44  import java.util.ArrayList;
45  import java.util.Collections;
46  import java.util.HashMap;
47  import java.util.List;
48  import java.util.Optional;
49  import java.util.concurrent.CompletableFuture;
50  import java.util.concurrent.ConcurrentHashMap;
51  import java.util.concurrent.ConcurrentMap;
52  import java.util.concurrent.ExecutionException;
53  import java.util.concurrent.ExecutorService;
54  import java.util.concurrent.Executors;
55  import java.util.concurrent.RejectedExecutionException;
56  import java.util.concurrent.ScheduledExecutorService;
57  import java.util.concurrent.TimeUnit;
58  import java.util.concurrent.atomic.AtomicBoolean;
59  import java.util.function.Supplier;
60  import java.util.stream.Collectors;
61  import org.apache.http.client.methods.CloseableHttpResponse;
62  import org.apache.http.client.methods.HttpPost;
63  import org.apache.http.entity.ByteArrayEntity;
64  import org.apache.http.impl.client.CloseableHttpClient;
65  import org.apache.http.impl.client.HttpClients;
66  import org.slf4j.Logger;
67  import org.slf4j.LoggerFactory;
68  import org.tikv.common.apiversion.RequestKeyCodec;
69  import org.tikv.common.codec.KeyUtils;
70  import org.tikv.common.exception.GrpcException;
71  import org.tikv.common.exception.TiClientInternalException;
72  import org.tikv.common.meta.TiTimestamp;
73  import org.tikv.common.operation.NoopHandler;
74  import org.tikv.common.operation.PDErrorHandler;
75  import org.tikv.common.util.BackOffFunction.BackOffFuncType;
76  import org.tikv.common.util.BackOffer;
77  import org.tikv.common.util.ChannelFactory;
78  import org.tikv.common.util.ConcreteBackOffer;
79  import org.tikv.common.util.HistogramUtils;
80  import org.tikv.common.util.Pair;
81  import org.tikv.kvproto.Metapb;
82  import org.tikv.kvproto.Metapb.Store;
83  import org.tikv.kvproto.PDGrpc;
84  import org.tikv.kvproto.PDGrpc.PDBlockingStub;
85  import org.tikv.kvproto.PDGrpc.PDFutureStub;
86  import org.tikv.kvproto.Pdpb;
87  import org.tikv.kvproto.Pdpb.Error;
88  import org.tikv.kvproto.Pdpb.ErrorType;
89  import org.tikv.kvproto.Pdpb.GetAllStoresRequest;
90  import org.tikv.kvproto.Pdpb.GetMembersRequest;
91  import org.tikv.kvproto.Pdpb.GetMembersResponse;
92  import org.tikv.kvproto.Pdpb.GetOperatorRequest;
93  import org.tikv.kvproto.Pdpb.GetOperatorResponse;
94  import org.tikv.kvproto.Pdpb.GetRegionByIDRequest;
95  import org.tikv.kvproto.Pdpb.GetRegionRequest;
96  import org.tikv.kvproto.Pdpb.GetRegionResponse;
97  import org.tikv.kvproto.Pdpb.GetStoreRequest;
98  import org.tikv.kvproto.Pdpb.GetStoreResponse;
99  import org.tikv.kvproto.Pdpb.OperatorStatus;
100 import org.tikv.kvproto.Pdpb.RequestHeader;
101 import org.tikv.kvproto.Pdpb.ResponseHeader;
102 import org.tikv.kvproto.Pdpb.ScatterRegionRequest;
103 import org.tikv.kvproto.Pdpb.ScatterRegionResponse;
104 import org.tikv.kvproto.Pdpb.Timestamp;
105 import org.tikv.kvproto.Pdpb.TsoRequest;
106 import org.tikv.kvproto.Pdpb.TsoResponse;
107 import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest;
108 
109 public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
110     implements ReadOnlyPDClient {
111   private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
112   private static final long MIN_TRY_UPDATE_DURATION = 50;
113   private static final int PAUSE_CHECKER_TIMEOUT = 300; // in seconds
114   private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5; // in seconds
115   private static final Logger logger = LoggerFactory.getLogger(PDClient.class);
116 
117   private final RequestKeyCodec codec;
118   private RequestHeader header;
119   private TsoRequest tsoReq;
120   private volatile PDClientWrapper pdClientWrapper;
121   private ScheduledExecutorService service;
122   private ScheduledExecutorService tiflashReplicaService;
123   private final HashMap<PDChecker, ScheduledExecutorService> pauseCheckerService = new HashMap<>();
124   private List<URI> pdAddrs;
125   private Client etcdClient;
126   private ConcurrentMap<Long, Double> tiflashReplicaMap;
127   private HostMapping hostMapping;
128   private long lastUpdateLeaderTime;
129   private final ExecutorService updateLeaderService = Executors.newSingleThreadExecutor();
130   private final AtomicBoolean updateLeaderNotify = new AtomicBoolean();
131 
132   public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY =
133       HistogramUtils.buildDuration()
134           .name("client_java_pd_get_region_by_requests_latency")
135           .help("pd getRegionByKey request latency.")
136           .labelNames("cluster")
137           .register();
138 
139   private PDClient(TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
140     super(conf, channelFactory);
141     initCluster();
142     this.codec = codec;
143     this.blockingStub = getBlockingStub();
144     this.asyncStub = getAsyncStub();
145   }
146 
147   public static ReadOnlyPDClient create(
148       TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
149     return createRaw(conf, codec, channelFactory);
150   }
151 
152   static PDClient createRaw(
153       TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
154     return new PDClient(conf, codec, channelFactory);
155   }
156 
157   public HostMapping getHostMapping() {
158     return hostMapping;
159   }
160 
161   @Override
162   public TiTimestamp getTimestamp(BackOffer backOffer) {
163     Supplier<TsoRequest> request = () -> tsoReq;
164 
165     PDErrorHandler<TsoResponse> handler =
166         new PDErrorHandler<>(
167             r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
168             this);
169 
170     TsoResponse resp = callWithRetry(backOffer, PDGrpc.getTsoMethod(), request, handler);
171     Timestamp timestamp = resp.getTimestamp();
172     return new TiTimestamp(timestamp.getPhysical(), timestamp.getLogical());
173   }
174 
175   public synchronized void keepPauseChecker(PDChecker checker) {
176     if (!this.pauseCheckerService.containsKey(checker)) {
177       ScheduledExecutorService newService =
178           Executors.newSingleThreadScheduledExecutor(
179               new ThreadFactoryBuilder()
180                   .setNameFormat(String.format("PDClient-pause-%s-pool-%%d", checker.name()))
181                   .setDaemon(true)
182                   .build());
183       newService.scheduleAtFixedRate(
184           () -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT),
185           0,
186           KEEP_CHECKER_PAUSE_PERIOD,
187           TimeUnit.SECONDS);
188       this.pauseCheckerService.put(checker, newService);
189     }
190   }
191 
192   public synchronized void stopKeepPauseChecker(PDChecker checker) {
193     if (this.pauseCheckerService.containsKey(checker)) {
194       this.pauseCheckerService.get(checker).shutdown();
195       this.pauseCheckerService.remove(checker);
196     }
197   }
198 
199   public void resumeChecker(PDChecker checker) {
200     pauseChecker(checker, 0);
201   }
202 
203   private void pauseChecker(PDChecker checker, int timeout) {
204     String verb = timeout == 0 ? "resume" : "pause";
205     URI url = pdAddrs.get(0);
206     String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
207     HashMap<String, Integer> arguments = new HashMap<>();
208     arguments.put("delay", timeout);
209     try (CloseableHttpClient client = HttpClients.createDefault()) {
210       JsonMapper jsonMapper = new JsonMapper();
211       byte[] body = jsonMapper.writeValueAsBytes(arguments);
212       HttpPost post = new HttpPost(api);
213       post.setEntity(new ByteArrayEntity(body));
214       try (CloseableHttpResponse resp = client.execute(post)) {
215         if (resp.getStatusLine().getStatusCode() != 200) {
216           logger.error("failed to {} checker.", verb);
217         }
218         logger.info("checker {} {}d", checker.apiName(), verb);
219       }
220     } catch (Exception e) {
221       logger.error(String.format("failed to %s checker.", verb), e);
222     }
223   }
224 
225   public Boolean isCheckerPaused(PDChecker checker) {
226     URI url = pdAddrs.get(0);
227     String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
228     try {
229       ObjectMapper mapper = new ObjectMapper();
230       HashMap<String, Boolean> status =
231           mapper.readValue(new URL(api), new TypeReference<HashMap<String, Boolean>>() {});
232       return status.get("paused");
233     } catch (Exception e) {
234       logger.error(String.format("failed to get %s checker status.", checker.apiName()), e);
235       return null;
236     }
237   }
238 
239   /**
240    * Sends request to pd to scatter region.
241    *
242    * @param region represents a region info
243    */
244   void scatterRegion(Metapb.Region region, BackOffer backOffer) {
245     Supplier<ScatterRegionRequest> request =
246         () ->
247             ScatterRegionRequest.newBuilder().setHeader(header).setRegionId(region.getId()).build();
248 
249     PDErrorHandler<ScatterRegionResponse> handler =
250         new PDErrorHandler<>(
251             r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
252             this);
253 
254     ScatterRegionResponse resp =
255         callWithRetry(backOffer, PDGrpc.getScatterRegionMethod(), request, handler);
256     // TODO: maybe we should retry here, need dig into pd's codebase.
257     if (resp.hasHeader() && resp.getHeader().hasError()) {
258       throw new TiClientInternalException(
259           String.format("failed to scatter region because %s", resp.getHeader().getError()));
260     }
261   }
262 
263   /**
264    * wait scatter region until finish
265    *
266    * @param region
267    */
268   void waitScatterRegionFinish(Metapb.Region region, BackOffer backOffer) {
269     for (; ; ) {
270       GetOperatorResponse resp = getOperator(region.getId());
271       if (resp != null) {
272         if (isScatterRegionFinish(resp)) {
273           logger.info(String.format("wait scatter region on %d is finished", region.getId()));
274           return;
275         } else {
276           backOffer.doBackOff(
277               BackOffFuncType.BoRegionMiss, new GrpcException("waiting scatter region"));
278           logger.info(
279               String.format(
280                   "wait scatter region %d at key %s is %s",
281                   region.getId(),
282                   KeyUtils.formatBytes(resp.getDesc().toByteArray()),
283                   resp.getStatus()));
284         }
285       }
286     }
287   }
288 
289   private GetOperatorResponse getOperator(long regionId) {
290     Supplier<GetOperatorRequest> request =
291         () -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build();
292     // get operator no need to handle error and no need back offer.
293     return callWithRetry(
294         ConcreteBackOffer.newCustomBackOff(0, getClusterId()),
295         PDGrpc.getGetOperatorMethod(),
296         request,
297         new NoopHandler<>());
298   }
299 
300   private boolean isScatterRegionFinish(GetOperatorResponse resp) {
301     // If the current operator of region is not `scatter-region`, we could assume
302     // that `scatter-operator` has finished or timeout.
303     boolean finished =
304         !resp.getDesc().equals(ByteString.copyFromUtf8("scatter-region"))
305             || resp.getStatus() != OperatorStatus.RUNNING;
306 
307     if (resp.hasHeader()) {
308       ResponseHeader header = resp.getHeader();
309       if (header.hasError()) {
310         Error error = header.getError();
311         // heartbeat may not send to PD
312         if (error.getType() == ErrorType.REGION_NOT_FOUND) {
313           finished = true;
314         }
315       }
316     }
317     return finished;
318   }
319 
320   @Override
321   public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
322     Histogram.Timer requestTimer =
323         PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer();
324     try {
325       Supplier<GetRegionRequest> request =
326           () ->
327               GetRegionRequest.newBuilder()
328                   .setHeader(header)
329                   .setRegionKey(codec.encodePdQuery(key))
330                   .build();
331 
332       PDErrorHandler<GetRegionResponse> handler =
333           new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
334 
335       GetRegionResponse resp =
336           callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
337       return new Pair<>(codec.decodeRegion(resp.getRegion()), resp.getLeader());
338     } finally {
339       requestTimer.observeDuration();
340     }
341   }
342 
343   @Override
344   public Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id) {
345     Supplier<GetRegionByIDRequest> request =
346         () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
347     PDErrorHandler<GetRegionResponse> handler =
348         new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
349 
350     GetRegionResponse resp =
351         callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler);
352     return new Pair<Metapb.Region, Metapb.Peer>(
353         codec.decodeRegion(resp.getRegion()), resp.getLeader());
354   }
355 
356   @Override
357   public List<Pdpb.Region> scanRegions(
358       BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
359     // no need to backoff because ScanRegions is just for optimization
360     // introduce a warm-up timeout for ScanRegions requests
361     PDGrpc.PDBlockingStub stub =
362         getBlockingStub().withDeadlineAfter(conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS);
363     Pair<ByteString, ByteString> range = codec.encodePdQueryRange(startKey, endKey);
364     Pdpb.ScanRegionsRequest request =
365         Pdpb.ScanRegionsRequest.newBuilder()
366             .setHeader(header)
367             .setStartKey(range.first)
368             .setEndKey(range.second)
369             .setLimit(limit)
370             .build();
371     Pdpb.ScanRegionsResponse resp = stub.scanRegions(request);
372     if (resp == null) {
373       return null;
374     }
375 
376     return codec.decodePdRegions(resp.getRegionsList());
377   }
378 
379   private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) {
380     return () -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build();
381   }
382 
383   private Supplier<GetAllStoresRequest> buildGetAllStoresReq() {
384     return () -> GetAllStoresRequest.newBuilder().setHeader(header).build();
385   }
386 
387   private Supplier<UpdateServiceGCSafePointRequest> buildUpdateServiceGCSafePointRequest(
388       ByteString serviceId, long ttl, long safePoint) {
389     return () ->
390         UpdateServiceGCSafePointRequest.newBuilder()
391             .setHeader(header)
392             .setSafePoint(safePoint)
393             .setServiceId(serviceId)
394             .setTTL(ttl)
395             .build();
396   }
397 
398   private <T> PDErrorHandler<GetStoreResponse> buildPDErrorHandler() {
399     return new PDErrorHandler<>(
400         r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this);
401   }
402 
403   @Override
404   public Store getStore(BackOffer backOffer, long storeId) {
405     GetStoreResponse resp =
406         callWithRetry(
407             backOffer,
408             PDGrpc.getGetStoreMethod(),
409             buildGetStoreReq(storeId),
410             buildPDErrorHandler());
411     if (resp != null) {
412       return resp.getStore();
413     }
414     return null;
415   }
416 
417   @Override
418   public List<Store> getAllStores(BackOffer backOffer) {
419     return callWithRetry(
420             backOffer,
421             PDGrpc.getGetAllStoresMethod(),
422             buildGetAllStoresReq(),
423             new PDErrorHandler<>(
424                 r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
425                 this))
426         .getStoresList();
427   }
428 
429   @Override
430   public TiConfiguration.ReplicaRead getReplicaRead() {
431     return conf.getReplicaRead();
432   }
433 
434   @Override
435   public Long updateServiceGCSafePoint(
436       String serviceId, long ttl, long safePoint, BackOffer backOffer) {
437     return callWithRetry(
438             backOffer,
439             PDGrpc.getUpdateServiceGCSafePointMethod(),
440             buildUpdateServiceGCSafePointRequest(
441                 ByteString.copyFromUtf8(serviceId), ttl, safePoint),
442             new PDErrorHandler<>(
443                 r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
444                 this))
445         .getMinSafePoint();
446   }
447 
448   @Override
449   public void close() throws InterruptedException {
450     etcdClient.close();
451     if (service != null) {
452       service.shutdownNow();
453     }
454     if (tiflashReplicaService != null) {
455       tiflashReplicaService.shutdownNow();
456     }
457     if (channelFactory != null) {
458       channelFactory.close();
459     }
460 
461     updateLeaderService.shutdownNow();
462   }
463 
464   @VisibleForTesting
465   RequestHeader getHeader() {
466     return header;
467   }
468 
469   @VisibleForTesting
470   PDClientWrapper getPdClientWrapper() {
471     return pdClientWrapper;
472   }
473 
474   private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
475     while (true) {
476       backOffer.checkTimeout();
477 
478       try {
479         ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping);
480         PDGrpc.PDBlockingStub stub =
481             PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
482         GetMembersRequest request =
483             GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
484         GetMembersResponse resp = stub.getMembers(request);
485         // check if the response contains a valid leader
486         if (resp != null && resp.getLeader().getMemberId() == 0) {
487           return null;
488         }
489         return resp;
490       } catch (Exception e) {
491         logger.warn(
492             "failed to get member from pd server from {}, caused by: {}", uri, e.getMessage());
493         backOffer.doBackOff(BackOffFuncType.BoPDRPC, e);
494       }
495     }
496   }
497 
498   private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
499     try {
500       return doGetMembers(backOffer, uri);
501     } catch (Exception e) {
502       return null;
503     }
504   }
505 
506   // return whether the leader has changed to target address `leaderUrlStr`.
507   synchronized boolean trySwitchLeader(String leaderUrlStr) {
508     if (pdClientWrapper != null) {
509       if (leaderUrlStr.equals(pdClientWrapper.getLeaderInfo())) {
510         // The message to leader is not forwarded by follower.
511         if (leaderUrlStr.equals(pdClientWrapper.getStoreAddress())) {
512           return true;
513         }
514       }
515       // If leader has transferred to another member, we can create another leaderWrapper.
516     }
517     // switch leader
518     return createLeaderClientWrapper(leaderUrlStr);
519   }
520 
521   private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) {
522     try {
523       // create new Leader
524       ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping);
525       pdClientWrapper =
526           new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime());
527       timeout = conf.getTimeout();
528     } catch (IllegalArgumentException e) {
529       return false;
530     }
531     logger.info(String.format("Switched to new leader: %s", pdClientWrapper));
532     return true;
533   }
534 
535   synchronized boolean createFollowerClientWrapper(
536       BackOffer backOffer, String followerUrlStr, String leaderUrls) {
537     // TODO: Why not strip protocol info on server side since grpc does not need it
538 
539     try {
540       if (!checkHealth(backOffer, followerUrlStr, hostMapping)) {
541         return false;
542       }
543 
544       // create new Leader
545       ManagedChannel channel = channelFactory.getChannel(followerUrlStr, hostMapping);
546       pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime());
547       timeout = conf.getForwardTimeout();
548     } catch (IllegalArgumentException e) {
549       return false;
550     }
551     logger.info(String.format("Switched to new leader by follower forward: %s", pdClientWrapper));
552     return true;
553   }
554 
555   public void tryUpdateLeaderOrForwardFollower() {
556     if (updateLeaderNotify.compareAndSet(false, true)) {
557       try {
558         updateLeaderService.submit(
559             () -> {
560               try {
561                 updateLeaderOrForwardFollower();
562               } catch (Exception e) {
563                 logger.info("update leader or forward follower failed", e);
564                 throw e;
565               } finally {
566                 updateLeaderNotify.set(false);
567                 logger.info("updating leader finish");
568               }
569             });
570       } catch (RejectedExecutionException e) {
571         logger.error("PDClient is shutdown", e);
572         updateLeaderNotify.set(false);
573       }
574     }
575   }
576 
577   private synchronized void updateLeaderOrForwardFollower() {
578     logger.warn("updating leader or forward follower");
579     if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
580       return;
581     }
582     for (URI url : this.pdAddrs) {
583       BackOffer backOffer = this.probeBackOffer();
584       // since resp is null, we need update leader's address by walking through all pd server.
585       GetMembersResponse resp = getMembers(backOffer, url);
586       if (resp == null) {
587         continue;
588       }
589       if (resp.getLeader().getClientUrlsList().isEmpty()) {
590         continue;
591       }
592 
593       String leaderUrlStr = resp.getLeader().getClientUrlsList().get(0);
594       leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
595 
596       // if leader is switched, just return.
597       if (checkHealth(backOffer, leaderUrlStr, hostMapping)
598           && createLeaderClientWrapper(leaderUrlStr)) {
599         lastUpdateLeaderTime = System.currentTimeMillis();
600         return;
601       }
602 
603       if (!conf.getEnableGrpcForward()) {
604         continue;
605       }
606 
607       logger.info(String.format("can not switch to new leader, try follower forward"));
608       List<Pdpb.Member> members = resp.getMembersList();
609 
610       // If we have not used follower forward, try the first follower.
611       boolean hasReachNextMember =
612           pdClientWrapper != null && pdClientWrapper.getStoreAddress().equals(leaderUrlStr);
613 
614       for (int i = 0; i < members.size() * 2; i++) {
615         Pdpb.Member member = members.get(i % members.size());
616         if (member.getMemberId() == resp.getLeader().getMemberId()) {
617           continue;
618         }
619         String followerUrlStr = member.getClientUrlsList().get(0);
620         followerUrlStr = uriToAddr(addrToUri(followerUrlStr));
621         if (pdClientWrapper != null && pdClientWrapper.getStoreAddress().equals(followerUrlStr)) {
622           hasReachNextMember = true;
623           continue;
624         }
625         if (hasReachNextMember
626             && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) {
627           logger.warn(
628               String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr));
629           return;
630         }
631       }
632     }
633     lastUpdateLeaderTime = System.currentTimeMillis();
634     if (pdClientWrapper == null) {
635       throw new TiClientInternalException(
636           "already tried all address on file, but not leader found yet.");
637     }
638   }
639 
640   public void tryUpdateLeader() {
641     logger.info("try update leader");
642     for (URI url : this.pdAddrs) {
643       BackOffer backOffer = this.probeBackOffer();
644       // since resp is null, we need update leader's address by walking through all pd server.
645       GetMembersResponse resp = getMembers(backOffer, url);
646       if (resp == null) {
647         continue;
648       }
649       List<URI> urls =
650           resp.getMembersList()
651               .stream()
652               .map(mem -> addrToUri(mem.getClientUrls(0)))
653               .collect(Collectors.toList());
654       String leaderUrlStr = resp.getLeader().getClientUrlsList().get(0);
655       leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
656 
657       // If leader is not change but becomes available, we can cancel follower forward.
658       if (checkHealth(backOffer, leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) {
659         if (!urls.equals(this.pdAddrs)) {
660           tryUpdateMembers(urls);
661         }
662         return;
663       }
664     }
665     lastUpdateLeaderTime = System.currentTimeMillis();
666     if (pdClientWrapper == null) {
667       throw new TiClientInternalException(
668           "already tried all address on file, but not leader found yet.");
669     }
670   }
671 
672   private synchronized void tryUpdateMembers(List<URI> members) {
673     this.pdAddrs = members;
674   }
675 
676   public void updateTiFlashReplicaStatus() {
677     ByteSequence prefix =
678         ByteSequence.from(TIFLASH_TABLE_SYNC_PROGRESS_PATH, StandardCharsets.UTF_8);
679     for (int i = 0; i < 5; i++) {
680       CompletableFuture<GetResponse> resp;
681       try {
682         resp =
683             etcdClient.getKVClient().get(prefix, GetOption.newBuilder().withPrefix(prefix).build());
684       } catch (Exception e) {
685         logger.info("get tiflash table replica sync progress failed, continue checking.", e);
686         continue;
687       }
688       GetResponse getResp;
689       try {
690         getResp = resp.get();
691       } catch (InterruptedException e) {
692         Thread.currentThread().interrupt();
693         continue;
694       } catch (ExecutionException e) {
695         throw new GrpcException("failed to update tiflash replica", e);
696       }
697       ConcurrentMap<Long, Double> progressMap = new ConcurrentHashMap<>();
698       for (KeyValue kv : getResp.getKvs()) {
699         long tableId;
700         try {
701           tableId =
702               Long.parseLong(
703                   kv.getKey().toString().substring(TIFLASH_TABLE_SYNC_PROGRESS_PATH.length()));
704         } catch (Exception e) {
705           logger.info(
706               "invalid tiflash table replica sync progress key. key = " + kv.getKey().toString());
707           continue;
708         }
709         double progress;
710         try {
711           progress = Double.parseDouble(kv.getValue().toString());
712         } catch (Exception e) {
713           logger.info(
714               "invalid tiflash table replica sync progress value. value = "
715                   + kv.getValue().toString());
716           continue;
717         }
718         progressMap.put(tableId, progress);
719       }
720       tiflashReplicaMap = progressMap;
721       break;
722     }
723   }
724 
725   public double getTiFlashReplicaProgress(long tableId) {
726     return tiflashReplicaMap.getOrDefault(tableId, 0.0);
727   }
728 
729   @Override
730   protected PDBlockingStub getBlockingStub() {
731     if (pdClientWrapper == null) {
732       throw new GrpcException("PDClient may not be initialized");
733     }
734     return pdClientWrapper.getBlockingStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
735   }
736 
737   @Override
738   protected PDFutureStub getAsyncStub() {
739     if (pdClientWrapper == null) {
740       throw new GrpcException("PDClient may not be initialized");
741     }
742     return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
743   }
744 
745   private void initCluster() {
746     logger.info("init cluster: start");
747     GetMembersResponse resp = null;
748     List<URI> pdAddrs = new ArrayList<>(getConf().getPdAddrs());
749     // shuffle PD addresses so that clients call getMembers from different PD
750     Collections.shuffle(pdAddrs);
751     this.pdAddrs = pdAddrs;
752     this.etcdClient =
753         Client.builder()
754             .endpoints(pdAddrs)
755             .executorService(
756                 Executors.newCachedThreadPool(
757                     new ThreadFactoryBuilder()
758                         .setNameFormat("etcd-conn-manager-pool-%d")
759                         .setDaemon(true)
760                         .build()))
761             .build();
762     logger.info("init host mapping: start");
763     this.hostMapping =
764         Optional.ofNullable(getConf().getHostMapping())
765             .orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName()));
766     logger.info("init host mapping: end");
767     // The first request may cost too much latency
768     long originTimeout = this.timeout;
769     this.timeout = conf.getPdFirstGetMemberTimeout();
770     for (URI u : pdAddrs) {
771       logger.info("get members with pd " + u + ": start");
772       resp = getMembers(defaultBackOffer(), u);
773       logger.info("get members with pd " + u + ": end");
774       if (resp != null) {
775         break;
776       }
777     }
778     if (resp == null) {
779       logger.error("Could not get leader member with: " + pdAddrs);
780     }
781 
782     this.timeout = originTimeout;
783     checkNotNull(resp, "Failed to init client for PD cluster.");
784     long clusterId = resp.getHeader().getClusterId();
785     header = RequestHeader.newBuilder().setClusterId(clusterId).build();
786     tsoReq = TsoRequest.newBuilder().setHeader(header).setCount(1).build();
787     this.tiflashReplicaMap = new ConcurrentHashMap<>();
788     this.pdAddrs =
789         resp.getMembersList()
790             .stream()
791             .map(mem -> addrToUri(mem.getClientUrls(0)))
792             .collect(Collectors.toList());
793     logger.info("init cluster with address: " + this.pdAddrs);
794 
795     String leaderUrlStr = resp.getLeader().getClientUrls(0);
796     leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr));
797     logger.info("createLeaderClientWrapper with leader " + leaderUrlStr + ": start");
798     createLeaderClientWrapper(leaderUrlStr);
799     logger.info("createLeaderClientWrapper with leader " + leaderUrlStr + ": end");
800     service =
801         Executors.newSingleThreadScheduledExecutor(
802             new ThreadFactoryBuilder()
803                 .setNameFormat("PDClient-update-leader-pool-%d")
804                 .setDaemon(true)
805                 .build());
806     service.scheduleAtFixedRate(
807         () -> {
808           // Wrap this with a try catch block in case schedule update fails
809           try {
810             tryUpdateLeader();
811           } catch (Exception e) {
812             logger.warn("Update leader failed", e);
813           }
814         },
815         10,
816         10,
817         TimeUnit.SECONDS);
818     if (conf.isTiFlashEnabled()) {
819       tiflashReplicaService =
820           Executors.newSingleThreadScheduledExecutor(
821               new ThreadFactoryBuilder()
822                   .setNameFormat("PDClient-tiflash-replica-pool-%d")
823                   .setDaemon(true)
824                   .build());
825       tiflashReplicaService.scheduleAtFixedRate(
826           this::updateTiFlashReplicaStatus, 10, 10, TimeUnit.SECONDS);
827     }
828     logger.info("init cluster: finish");
829   }
830 
831   static class PDClientWrapper {
832     private final String leaderInfo;
833     private final PDBlockingStub blockingStub;
834     private final PDFutureStub asyncStub;
835     private final long createTime;
836     private final String storeAddress;
837 
838     PDClientWrapper(
839         String leaderInfo, String storeAddress, ManagedChannel clientChannel, long createTime) {
840       if (!storeAddress.equals(leaderInfo)) {
841         Metadata header = new Metadata();
842         header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString());
843         this.blockingStub =
844             PDGrpc.newBlockingStub(clientChannel)
845                 .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
846         this.asyncStub =
847             PDGrpc.newFutureStub(clientChannel)
848                 .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
849       } else {
850         this.blockingStub = PDGrpc.newBlockingStub(clientChannel);
851         this.asyncStub = PDGrpc.newFutureStub(clientChannel);
852       }
853       this.leaderInfo = leaderInfo;
854       this.storeAddress = storeAddress;
855       this.createTime = createTime;
856     }
857 
858     String getLeaderInfo() {
859       return leaderInfo;
860     }
861 
862     String getStoreAddress() {
863       return storeAddress;
864     }
865 
866     PDBlockingStub getBlockingStub() {
867       return blockingStub;
868     }
869 
870     PDFutureStub getAsyncStub() {
871       return asyncStub;
872     }
873 
874     long getCreateTime() {
875       return createTime;
876     }
877 
878     @Override
879     public String toString() {
880       return "[leaderInfo: " + leaderInfo + ", storeAddress: " + storeAddress + "]";
881     }
882   }
883 
884   public Long getClusterId() {
885     return header.getClusterId();
886   }
887 
888   public List<URI> getPdAddrs() {
889     return pdAddrs;
890   }
891 
892   public RequestKeyCodec getCodec() {
893     return codec;
894   }
895 
896   private static BackOffer defaultBackOffer() {
897     return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
898   }
899 
900   private BackOffer probeBackOffer() {
901     int maxSleep = (int) getTimeout() * 2;
902     return ConcreteBackOffer.newCustomBackOff(maxSleep);
903   }
904 }