1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.policy;
19
20 import com.google.common.collect.ImmutableSet;
21 import io.grpc.Status;
22 import io.prometheus.client.Counter;
23 import io.prometheus.client.Histogram;
24 import java.util.concurrent.Callable;
25 import org.tikv.common.exception.GrpcException;
26 import org.tikv.common.log.SlowLogSpan;
27 import org.tikv.common.operation.ErrorHandler;
28 import org.tikv.common.util.BackOffer;
29 import org.tikv.common.util.ConcreteBackOffer;
30 import org.tikv.common.util.HistogramUtils;
31
32 public abstract class RetryPolicy<RespT> {
33 BackOffer backOffer = ConcreteBackOffer.newCopNextMaxBackOff();
34 public static final Histogram GRPC_SINGLE_REQUEST_LATENCY =
35 HistogramUtils.buildDuration()
36 .name("client_java_grpc_single_requests_latency")
37 .help("grpc request latency.")
38 .labelNames("type", "cluster")
39 .register();
40 public static final Histogram CALL_WITH_RETRY_DURATION =
41 HistogramUtils.buildDuration()
42 .name("client_java_call_with_retry_duration")
43 .help("callWithRetry duration.")
44 .labelNames("type", "cluster")
45 .register();
46 public static final Counter GRPC_REQUEST_RETRY_NUM =
47 Counter.build()
48 .name("client_java_grpc_requests_retry_num")
49 .help("grpc request retry num.")
50 .labelNames("type", "cluster")
51 .register();
52
53
54 private final ErrorHandler<RespT> handler;
55
56 private final ImmutableSet<Status.Code> unrecoverableStatus =
57 ImmutableSet.of(
58 Status.Code.ALREADY_EXISTS, Status.Code.PERMISSION_DENIED,
59 Status.Code.INVALID_ARGUMENT, Status.Code.NOT_FOUND,
60 Status.Code.UNIMPLEMENTED, Status.Code.OUT_OF_RANGE,
61 Status.Code.UNAUTHENTICATED, Status.Code.CANCELLED);
62
63 RetryPolicy(ErrorHandler<RespT> handler) {
64 this.handler = handler;
65 }
66
67 private void rethrowNotRecoverableException(Exception e) {
68 Status status = Status.fromThrowable(e);
69 if (unrecoverableStatus.contains(status.getCode())) {
70 throw new GrpcException(e);
71 }
72 }
73
74 public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) {
75 String[] labels = new String[] {methodName, backOffer.getClusterId().toString()};
76 Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(labels).startTimer();
77 SlowLogSpan callWithRetrySlowLogSpan = backOffer.getSlowLog().start("callWithRetry");
78 callWithRetrySlowLogSpan.addProperty("method", methodName);
79 try {
80 while (true) {
81 RespT result = null;
82 try {
83
84 Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(labels).startTimer();
85 SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC");
86 slowLogSpan.addProperty("method", methodName);
87 try {
88 result = proc.call();
89 } finally {
90 slowLogSpan.end();
91 requestTimer.observeDuration();
92 }
93 } catch (Exception e) {
94 rethrowNotRecoverableException(e);
95
96 backOffer.checkTimeout();
97 boolean retry = handler.handleRequestError(backOffer, e);
98 if (retry) {
99 GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
100 continue;
101 } else {
102 return result;
103 }
104 }
105
106
107 if (handler != null) {
108 boolean retry = handler.handleResponseError(backOffer, result);
109 if (retry) {
110 GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
111 continue;
112 }
113 }
114 return result;
115 }
116 } finally {
117 callWithRetryTimer.observeDuration();
118 callWithRetrySlowLogSpan.end();
119 }
120 }
121
122 public interface Builder<T> {
123 RetryPolicy<T> create(ErrorHandler<T> handler);
124 }
125 }