1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.region;
19
20 import static com.google.common.base.Preconditions.checkArgument;
21 import static com.google.common.base.Preconditions.checkNotNull;
22
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.protobuf.ByteString;
25 import io.grpc.ManagedChannel;
26 import io.grpc.Metadata;
27 import io.grpc.stub.MetadataUtils;
28 import io.prometheus.client.Histogram;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.TimeUnit;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.tikv.common.AbstractGRPCClient;
36 import org.tikv.common.TiConfiguration;
37 import org.tikv.common.apiversion.RequestKeyCodec;
38 import org.tikv.common.exception.GrpcException;
39 import org.tikv.common.log.SlowLog;
40 import org.tikv.common.log.SlowLogSpan;
41 import org.tikv.common.util.BackOffer;
42 import org.tikv.common.util.ChannelFactory;
43 import org.tikv.common.util.HistogramUtils;
44 import org.tikv.kvproto.Kvrpcpb;
45 import org.tikv.kvproto.Metapb;
46 import org.tikv.kvproto.TikvGrpc;
47 import org.tikv.kvproto.Tracepb;
48
49 public abstract class AbstractRegionStoreClient
50 extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvFutureStub>
51 implements RegionErrorReceiver {
52
53 private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class);
54
55 public static final Histogram SEEK_LEADER_STORE_DURATION =
56 HistogramUtils.buildDuration()
57 .name("client_java_seek_leader_store_duration")
58 .help("seek leader store duration.")
59 .labelNames("cluster")
60 .register();
61
62 public static final Histogram SEEK_PROXY_STORE_DURATION =
63 HistogramUtils.buildDuration()
64 .name("client_java_seek_proxy_store_duration")
65 .help("seek proxy store duration.")
66 .labelNames("cluster")
67 .register();
68
69 protected final RegionManager regionManager;
70 protected final RequestKeyCodec codec;
71 protected TiRegion region;
72 protected TiStore store;
73
74 protected AbstractRegionStoreClient(
75 TiConfiguration conf,
76 TiRegion region,
77 TiStore store,
78 ChannelFactory channelFactory,
79 TikvGrpc.TikvBlockingStub blockingStub,
80 TikvGrpc.TikvFutureStub asyncStub,
81 RegionManager regionManager) {
82 super(conf, channelFactory, blockingStub, asyncStub);
83 checkNotNull(region, "Region is empty");
84 checkNotNull(region.getLeader(), "Leader Peer is null");
85 checkArgument(region.getLeader() != null, "Leader Peer is null");
86 this.region = region;
87 this.regionManager = regionManager;
88 this.store = store;
89 this.codec = regionManager.getPDClient().getCodec();
90 if (this.store.getProxyStore() != null) {
91 this.timeout = conf.getForwardTimeout();
92 }
93 }
94
95 @Override
96 public TiRegion getRegion() {
97 return region;
98 }
99
100 @Override
101 protected TikvGrpc.TikvBlockingStub getBlockingStub() {
102 return blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
103 }
104
105 @Override
106 protected TikvGrpc.TikvFutureStub getAsyncStub() {
107 return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
108 }
109
110 @Override
111 public void close() throws GrpcException {}
112
113
114
115
116
117
118
119 @Override
120 public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) {
121 if (logger.isDebugEnabled()) {
122 logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId());
123 }
124
125
126 if (!region.getRegionEpoch().equals(newRegion.getRegionEpoch())) {
127 return false;
128 }
129
130
131 if (region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) {
132 store = null;
133 }
134 region = newRegion;
135 store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
136 updateClientStub();
137 return true;
138 }
139
140 @Override
141 public boolean onStoreUnreachable(BackOffer backOffer) {
142 if (!store.isValid()) {
143 logger.warn(String.format("store [%d] has been invalid", store.getId()));
144 store = regionManager.getStoreById(store.getId(), backOffer);
145 updateClientStub();
146 return true;
147 }
148
149
150 backOffer.checkTimeout();
151 Boolean result = seekLeaderStore(backOffer);
152 if (result != null) {
153 return result;
154 }
155 if (conf.getEnableGrpcForward()) {
156
157 backOffer.checkTimeout();
158 return seekProxyStore(backOffer);
159 }
160 return false;
161 }
162
163 private Kvrpcpb.Context addTraceId(Kvrpcpb.Context context, SlowLog slowLog) {
164 if (slowLog.getThresholdMS() < 0) {
165
166 return context;
167 }
168 long traceId = slowLog.getTraceId();
169 return Kvrpcpb.Context.newBuilder(context)
170 .setTraceContext(
171 Tracepb.TraceContext.newBuilder()
172 .setDurationThresholdMs(
173 (int) (slowLog.getThresholdMS() * conf.getRawKVServerSlowLogFactor()))
174 .addRemoteParentSpans(Tracepb.RemoteParentSpan.newBuilder().setTraceId(traceId)))
175 .build();
176 }
177
178 protected Kvrpcpb.Context makeContext(TiStoreType storeType, SlowLog slowLog) {
179 Kvrpcpb.Context context = region.getReplicaContext(java.util.Collections.emptySet(), storeType);
180 return addTraceId(context, slowLog);
181 }
182
183 protected Kvrpcpb.Context makeContext(
184 Set<Long> resolvedLocks, TiStoreType storeType, SlowLog slowLog) {
185 Kvrpcpb.Context context = region.getReplicaContext(resolvedLocks, storeType);
186 return addTraceId(context, slowLog);
187 }
188
189 protected Kvrpcpb.Context makeContext() {
190 return region.getLeaderContext();
191 }
192
193 protected Kvrpcpb.Context makeContext(Metapb.Peer peer) {
194 return region.getReplicaContext(peer);
195 }
196
197 private void updateClientStub() {
198 String addressStr = store.getStore().getAddress();
199 long deadline = timeout;
200 if (store.getProxyStore() != null) {
201 addressStr = store.getProxyStore().getAddress();
202 deadline = conf.getForwardTimeout();
203 }
204 ManagedChannel channel =
205 channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
206 blockingStub =
207 TikvGrpc.newBlockingStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
208 asyncStub = TikvGrpc.newFutureStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
209 if (store.getProxyStore() != null) {
210 Metadata header = new Metadata();
211 header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
212 blockingStub =
213 blockingStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
214 asyncStub = asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
215 }
216 }
217
218 private Boolean seekLeaderStore(BackOffer backOffer) {
219 Histogram.Timer switchLeaderDurationTimer =
220 SEEK_LEADER_STORE_DURATION
221 .labels(regionManager.getPDClient().getClusterId().toString())
222 .startTimer();
223 SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore");
224 try {
225 List<Metapb.Peer> peers = region.getFollowerList();
226 if (peers.isEmpty()) {
227
228 logger.warn(String.format("no followers of region[%d] available, retry", region.getId()));
229 regionManager.onRequestFail(region);
230 return false;
231 }
232
233 logger.info(String.format("try switch leader: region[%d]", region.getId()));
234
235 Metapb.Peer peer = switchLeaderStore(backOffer);
236 if (peer != null) {
237
238 TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
239 if (currentLeaderStore.isReachable()) {
240 logger.info(
241 String.format(
242 "update leader using switchLeader logic from store[%d] to store[%d]",
243 region.getLeader().getStoreId(), peer.getStoreId()));
244
245 TiRegion result = regionManager.updateLeader(region, peer.getStoreId());
246 if (result != null) {
247 region = result;
248
249 store = currentLeaderStore;
250 updateClientStub();
251 return true;
252 }
253 return false;
254 }
255 } else {
256
257 logger.warn(
258 String.format(
259 "leader for region[%d] is not found, it is possible that network partition occurred",
260 region.getId()));
261 }
262 } finally {
263 switchLeaderDurationTimer.observeDuration();
264 slowLogSpan.end();
265 }
266 return null;
267 }
268
269 private boolean seekProxyStore(BackOffer backOffer) {
270 SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore");
271 Histogram.Timer grpcForwardDurationTimer =
272 SEEK_PROXY_STORE_DURATION
273 .labels(regionManager.getPDClient().getClusterId().toString())
274 .startTimer();
275 try {
276 logger.info(String.format("try grpc forward: region[%d]", region.getId()));
277
278 TiStore storeWithProxy = switchProxyStore(backOffer);
279 if (storeWithProxy == null) {
280
281 logger.warn(String.format("No store available, retry: region[%d]", region.getId()));
282 return false;
283 }
284
285 regionManager.updateStore(store, storeWithProxy);
286 store = storeWithProxy;
287 updateClientStub();
288 return true;
289 } finally {
290 grpcForwardDurationTimer.observeDuration();
291 slowLogSpan.end();
292 }
293 }
294
295
296 private Metapb.Peer switchLeaderStore(BackOffer backOffer) {
297 List<SwitchLeaderTask> responses = new LinkedList<>();
298 for (Metapb.Peer peer : region.getFollowerList()) {
299 ByteString key = region.getStartKey();
300 try {
301 TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
302 ManagedChannel channel =
303 channelFactory.getChannel(
304 peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
305 TikvGrpc.TikvFutureStub stub =
306 TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
307 Kvrpcpb.RawGetRequest rawGetRequest =
308 Kvrpcpb.RawGetRequest.newBuilder()
309 .setContext(makeContext(peer))
310 .setKey(codec.encodeKey(key))
311 .build();
312 ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
313 responses.add(new SwitchLeaderTask(task, peer));
314 } catch (Exception e) {
315 logger.warn(
316 "switch region[{}] leader store to {} failed: {}",
317 region.getId(),
318 peer.getStoreId(),
319 e);
320 }
321 }
322 while (true) {
323 try {
324 Thread.sleep(2);
325 } catch (InterruptedException e) {
326 throw new GrpcException(e);
327 }
328 List<SwitchLeaderTask> unfinished = new LinkedList<>();
329 for (SwitchLeaderTask task : responses) {
330 if (!task.task.isDone()) {
331 unfinished.add(task);
332 continue;
333 }
334 try {
335 Kvrpcpb.RawGetResponse resp = task.task.get();
336 if (resp != null) {
337 if (!resp.hasRegionError()) {
338
339 logger.info(
340 String.format("rawGet response indicates peer[%d] is leader", task.peer.getId()));
341 return task.peer;
342 }
343 }
344 } catch (Exception ignored) {
345 }
346 }
347 if (unfinished.isEmpty()) {
348 return null;
349 }
350 responses = unfinished;
351 }
352 }
353
354 private TiStore switchProxyStore(BackOffer backOffer) {
355 long forwardTimeout = conf.getForwardTimeout();
356 List<ForwardCheckTask> responses = new LinkedList<>();
357 for (Metapb.Peer peer : region.getFollowerList()) {
358 ByteString key = region.getStartKey();
359 try {
360 TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
361 ManagedChannel channel =
362 channelFactory.getChannel(
363 peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
364 TikvGrpc.TikvFutureStub stub =
365 TikvGrpc.newFutureStub(channel)
366 .withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
367 Metadata header = new Metadata();
368 header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
369 Kvrpcpb.RawGetRequest rawGetRequest =
370 Kvrpcpb.RawGetRequest.newBuilder()
371 .setContext(makeContext())
372 .setKey(codec.encodeKey(key))
373 .build();
374 ListenableFuture<Kvrpcpb.RawGetResponse> task =
375 stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header))
376 .rawGet(rawGetRequest);
377 responses.add(new ForwardCheckTask(task, peerStore.getStore()));
378 } catch (Exception e) {
379 logger.warn(
380 "switch region[{}] leader store to {} failed: {}",
381 region.getId(),
382 peer.getStoreId(),
383 e);
384 }
385 }
386 while (true) {
387 try {
388 Thread.sleep(2);
389 } catch (InterruptedException e) {
390 throw new GrpcException(e);
391 }
392 List<ForwardCheckTask> unfinished = new LinkedList<>();
393 for (ForwardCheckTask task : responses) {
394 if (!task.task.isDone()) {
395 unfinished.add(task);
396 continue;
397 }
398 try {
399
400 Kvrpcpb.RawGetResponse resp = task.task.get();
401 logger.info(
402 String.format(
403 "rawGetResponse indicates forward from [%s] to [%s]",
404 task.store.getAddress(), store.getAddress()));
405 return store.withProxy(task.store);
406 } catch (Exception ignored) {
407 }
408 }
409 if (unfinished.isEmpty()) {
410 return null;
411 }
412 responses = unfinished;
413 }
414 }
415
416 private static class SwitchLeaderTask {
417
418 private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
419 private final Metapb.Peer peer;
420
421 private SwitchLeaderTask(ListenableFuture<Kvrpcpb.RawGetResponse> task, Metapb.Peer peer) {
422 this.task = task;
423 this.peer = peer;
424 }
425 }
426
427 private static class ForwardCheckTask {
428
429 private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
430 private final Metapb.Store store;
431
432 private ForwardCheckTask(ListenableFuture<Kvrpcpb.RawGetResponse> task, Metapb.Store store) {
433 this.task = task;
434 this.store = store;
435 }
436 }
437 }