View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.region;
19  
20  import static com.google.common.base.Preconditions.checkArgument;
21  import static com.google.common.base.Preconditions.checkNotNull;
22  
23  import com.google.common.util.concurrent.ListenableFuture;
24  import com.google.protobuf.ByteString;
25  import io.grpc.ManagedChannel;
26  import io.grpc.Metadata;
27  import io.grpc.stub.MetadataUtils;
28  import io.prometheus.client.Histogram;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Set;
32  import java.util.concurrent.TimeUnit;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  import org.tikv.common.AbstractGRPCClient;
36  import org.tikv.common.TiConfiguration;
37  import org.tikv.common.apiversion.RequestKeyCodec;
38  import org.tikv.common.exception.GrpcException;
39  import org.tikv.common.log.SlowLog;
40  import org.tikv.common.log.SlowLogSpan;
41  import org.tikv.common.util.BackOffer;
42  import org.tikv.common.util.ChannelFactory;
43  import org.tikv.common.util.HistogramUtils;
44  import org.tikv.kvproto.Kvrpcpb;
45  import org.tikv.kvproto.Metapb;
46  import org.tikv.kvproto.TikvGrpc;
47  import org.tikv.kvproto.Tracepb;
48  
49  public abstract class AbstractRegionStoreClient
50      extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvFutureStub>
51      implements RegionErrorReceiver {
52  
53    private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class);
54  
55    public static final Histogram SEEK_LEADER_STORE_DURATION =
56        HistogramUtils.buildDuration()
57            .name("client_java_seek_leader_store_duration")
58            .help("seek leader store duration.")
59            .labelNames("cluster")
60            .register();
61  
62    public static final Histogram SEEK_PROXY_STORE_DURATION =
63        HistogramUtils.buildDuration()
64            .name("client_java_seek_proxy_store_duration")
65            .help("seek proxy store duration.")
66            .labelNames("cluster")
67            .register();
68  
69    protected final RegionManager regionManager;
70    protected final RequestKeyCodec codec;
71    protected TiRegion region;
72    protected TiStore store;
73  
74    protected AbstractRegionStoreClient(
75        TiConfiguration conf,
76        TiRegion region,
77        TiStore store,
78        ChannelFactory channelFactory,
79        TikvGrpc.TikvBlockingStub blockingStub,
80        TikvGrpc.TikvFutureStub asyncStub,
81        RegionManager regionManager) {
82      super(conf, channelFactory, blockingStub, asyncStub);
83      checkNotNull(region, "Region is empty");
84      checkNotNull(region.getLeader(), "Leader Peer is null");
85      checkArgument(region.getLeader() != null, "Leader Peer is null");
86      this.region = region;
87      this.regionManager = regionManager;
88      this.store = store;
89      this.codec = regionManager.getPDClient().getCodec();
90      if (this.store.getProxyStore() != null) {
91        this.timeout = conf.getForwardTimeout();
92      }
93    }
94  
95    @Override
96    public TiRegion getRegion() {
97      return region;
98    }
99  
100   @Override
101   protected TikvGrpc.TikvBlockingStub getBlockingStub() {
102     return blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
103   }
104 
105   @Override
106   protected TikvGrpc.TikvFutureStub getAsyncStub() {
107     return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
108   }
109 
110   @Override
111   public void close() throws GrpcException {}
112 
113   /**
114    * onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed
115    *
116    * @param newRegion the new region presented by NotLeader Error
117    * @return false when re-split is needed.
118    */
119   @Override
120   public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) {
121     if (logger.isDebugEnabled()) {
122       logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId());
123     }
124     // When switch leader fails or the region changed its region epoch,
125     // it would be necessary to re-split task's key range for new region.
126     if (!region.getRegionEpoch().equals(newRegion.getRegionEpoch())) {
127       return false;
128     }
129 
130     // If we try one peer but find the leader has not changed, we do not need to try other peers.
131     if (region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) {
132       store = null;
133     }
134     region = newRegion;
135     store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
136     updateClientStub();
137     return true;
138   }
139 
140   @Override
141   public boolean onStoreUnreachable(BackOffer backOffer) {
142     if (!store.isValid()) {
143       logger.warn(String.format("store [%d] has been invalid", store.getId()));
144       store = regionManager.getStoreById(store.getId(), backOffer);
145       updateClientStub();
146       return true;
147     }
148 
149     // seek an available leader store to send request
150     backOffer.checkTimeout();
151     Boolean result = seekLeaderStore(backOffer);
152     if (result != null) {
153       return result;
154     }
155     if (conf.getEnableGrpcForward()) {
156       // seek an available proxy store to forward request
157       backOffer.checkTimeout();
158       return seekProxyStore(backOffer);
159     }
160     return false;
161   }
162 
163   private Kvrpcpb.Context addTraceId(Kvrpcpb.Context context, SlowLog slowLog) {
164     if (slowLog.getThresholdMS() < 0) {
165       // disable tikv tracing
166       return context;
167     }
168     long traceId = slowLog.getTraceId();
169     return Kvrpcpb.Context.newBuilder(context)
170         .setTraceContext(
171             Tracepb.TraceContext.newBuilder()
172                 .setDurationThresholdMs(
173                     (int) (slowLog.getThresholdMS() * conf.getRawKVServerSlowLogFactor()))
174                 .addRemoteParentSpans(Tracepb.RemoteParentSpan.newBuilder().setTraceId(traceId)))
175         .build();
176   }
177 
178   protected Kvrpcpb.Context makeContext(TiStoreType storeType, SlowLog slowLog) {
179     Kvrpcpb.Context context = region.getReplicaContext(java.util.Collections.emptySet(), storeType);
180     return addTraceId(context, slowLog);
181   }
182 
183   protected Kvrpcpb.Context makeContext(
184       Set<Long> resolvedLocks, TiStoreType storeType, SlowLog slowLog) {
185     Kvrpcpb.Context context = region.getReplicaContext(resolvedLocks, storeType);
186     return addTraceId(context, slowLog);
187   }
188 
189   protected Kvrpcpb.Context makeContext() {
190     return region.getLeaderContext();
191   }
192 
193   protected Kvrpcpb.Context makeContext(Metapb.Peer peer) {
194     return region.getReplicaContext(peer);
195   }
196 
197   private void updateClientStub() {
198     String addressStr = store.getStore().getAddress();
199     long deadline = timeout;
200     if (store.getProxyStore() != null) {
201       addressStr = store.getProxyStore().getAddress();
202       deadline = conf.getForwardTimeout();
203     }
204     ManagedChannel channel =
205         channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
206     blockingStub =
207         TikvGrpc.newBlockingStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
208     asyncStub = TikvGrpc.newFutureStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
209     if (store.getProxyStore() != null) {
210       Metadata header = new Metadata();
211       header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
212       blockingStub =
213           blockingStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
214       asyncStub = asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
215     }
216   }
217 
218   private Boolean seekLeaderStore(BackOffer backOffer) {
219     Histogram.Timer switchLeaderDurationTimer =
220         SEEK_LEADER_STORE_DURATION
221             .labels(regionManager.getPDClient().getClusterId().toString())
222             .startTimer();
223     SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore");
224     try {
225       List<Metapb.Peer> peers = region.getFollowerList();
226       if (peers.isEmpty()) {
227         // no followers available, retry
228         logger.warn(String.format("no followers of region[%d] available, retry", region.getId()));
229         regionManager.onRequestFail(region);
230         return false;
231       }
232 
233       logger.info(String.format("try switch leader: region[%d]", region.getId()));
234 
235       Metapb.Peer peer = switchLeaderStore(backOffer);
236       if (peer != null) {
237         // we found a leader
238         TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
239         if (currentLeaderStore.isReachable()) {
240           logger.info(
241               String.format(
242                   "update leader using switchLeader logic from store[%d] to store[%d]",
243                   region.getLeader().getStoreId(), peer.getStoreId()));
244           // update region cache
245           TiRegion result = regionManager.updateLeader(region, peer.getStoreId());
246           if (result != null) {
247             region = result;
248             // switch to leader store
249             store = currentLeaderStore;
250             updateClientStub();
251             return true;
252           }
253           return false;
254         }
255       } else {
256         // no leader found, some response does not return normally, there may be network partition.
257         logger.warn(
258             String.format(
259                 "leader for region[%d] is not found, it is possible that network partition occurred",
260                 region.getId()));
261       }
262     } finally {
263       switchLeaderDurationTimer.observeDuration();
264       slowLogSpan.end();
265     }
266     return null;
267   }
268 
269   private boolean seekProxyStore(BackOffer backOffer) {
270     SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore");
271     Histogram.Timer grpcForwardDurationTimer =
272         SEEK_PROXY_STORE_DURATION
273             .labels(regionManager.getPDClient().getClusterId().toString())
274             .startTimer();
275     try {
276       logger.info(String.format("try grpc forward: region[%d]", region.getId()));
277       // when current leader cannot be reached
278       TiStore storeWithProxy = switchProxyStore(backOffer);
279       if (storeWithProxy == null) {
280         // no store available, retry
281         logger.warn(String.format("No store available, retry: region[%d]", region.getId()));
282         return false;
283       }
284       // use proxy store to forward requests
285       regionManager.updateStore(store, storeWithProxy);
286       store = storeWithProxy;
287       updateClientStub();
288       return true;
289     } finally {
290       grpcForwardDurationTimer.observeDuration();
291       slowLogSpan.end();
292     }
293   }
294 
295   // first: leader peer, second: true if any responses returned with grpc error
296   private Metapb.Peer switchLeaderStore(BackOffer backOffer) {
297     List<SwitchLeaderTask> responses = new LinkedList<>();
298     for (Metapb.Peer peer : region.getFollowerList()) {
299       ByteString key = region.getStartKey();
300       try {
301         TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
302         ManagedChannel channel =
303             channelFactory.getChannel(
304                 peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
305         TikvGrpc.TikvFutureStub stub =
306             TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
307         Kvrpcpb.RawGetRequest rawGetRequest =
308             Kvrpcpb.RawGetRequest.newBuilder()
309                 .setContext(makeContext(peer))
310                 .setKey(codec.encodeKey(key))
311                 .build();
312         ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
313         responses.add(new SwitchLeaderTask(task, peer));
314       } catch (Exception e) {
315         logger.warn(
316             "switch region[{}] leader store to {} failed: {}",
317             region.getId(),
318             peer.getStoreId(),
319             e);
320       }
321     }
322     while (true) {
323       try {
324         Thread.sleep(2);
325       } catch (InterruptedException e) {
326         throw new GrpcException(e);
327       }
328       List<SwitchLeaderTask> unfinished = new LinkedList<>();
329       for (SwitchLeaderTask task : responses) {
330         if (!task.task.isDone()) {
331           unfinished.add(task);
332           continue;
333         }
334         try {
335           Kvrpcpb.RawGetResponse resp = task.task.get();
336           if (resp != null) {
337             if (!resp.hasRegionError()) {
338               // the peer is leader
339               logger.info(
340                   String.format("rawGet response indicates peer[%d] is leader", task.peer.getId()));
341               return task.peer;
342             }
343           }
344         } catch (Exception ignored) {
345         }
346       }
347       if (unfinished.isEmpty()) {
348         return null;
349       }
350       responses = unfinished;
351     }
352   }
353 
354   private TiStore switchProxyStore(BackOffer backOffer) {
355     long forwardTimeout = conf.getForwardTimeout();
356     List<ForwardCheckTask> responses = new LinkedList<>();
357     for (Metapb.Peer peer : region.getFollowerList()) {
358       ByteString key = region.getStartKey();
359       try {
360         TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
361         ManagedChannel channel =
362             channelFactory.getChannel(
363                 peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
364         TikvGrpc.TikvFutureStub stub =
365             TikvGrpc.newFutureStub(channel)
366                 .withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
367         Metadata header = new Metadata();
368         header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
369         Kvrpcpb.RawGetRequest rawGetRequest =
370             Kvrpcpb.RawGetRequest.newBuilder()
371                 .setContext(makeContext())
372                 .setKey(codec.encodeKey(key))
373                 .build();
374         ListenableFuture<Kvrpcpb.RawGetResponse> task =
375             stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header))
376                 .rawGet(rawGetRequest);
377         responses.add(new ForwardCheckTask(task, peerStore.getStore()));
378       } catch (Exception e) {
379         logger.warn(
380             "switch region[{}] leader store to {} failed: {}",
381             region.getId(),
382             peer.getStoreId(),
383             e);
384       }
385     }
386     while (true) {
387       try {
388         Thread.sleep(2);
389       } catch (InterruptedException e) {
390         throw new GrpcException(e);
391       }
392       List<ForwardCheckTask> unfinished = new LinkedList<>();
393       for (ForwardCheckTask task : responses) {
394         if (!task.task.isDone()) {
395           unfinished.add(task);
396           continue;
397         }
398         try {
399           // any answer will do
400           Kvrpcpb.RawGetResponse resp = task.task.get();
401           logger.info(
402               String.format(
403                   "rawGetResponse indicates forward from [%s] to [%s]",
404                   task.store.getAddress(), store.getAddress()));
405           return store.withProxy(task.store);
406         } catch (Exception ignored) {
407         }
408       }
409       if (unfinished.isEmpty()) {
410         return null;
411       }
412       responses = unfinished;
413     }
414   }
415 
416   private static class SwitchLeaderTask {
417 
418     private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
419     private final Metapb.Peer peer;
420 
421     private SwitchLeaderTask(ListenableFuture<Kvrpcpb.RawGetResponse> task, Metapb.Peer peer) {
422       this.task = task;
423       this.peer = peer;
424     }
425   }
426 
427   private static class ForwardCheckTask {
428 
429     private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
430     private final Metapb.Store store;
431 
432     private ForwardCheckTask(ListenableFuture<Kvrpcpb.RawGetResponse> task, Metapb.Store store) {
433       this.task = task;
434       this.store = store;
435     }
436   }
437 }