View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.policy;
19  
20  import com.google.common.collect.ImmutableSet;
21  import io.grpc.Status;
22  import io.prometheus.client.Counter;
23  import io.prometheus.client.Histogram;
24  import java.util.concurrent.Callable;
25  import org.tikv.common.exception.GrpcException;
26  import org.tikv.common.log.SlowLogSpan;
27  import org.tikv.common.operation.ErrorHandler;
28  import org.tikv.common.util.BackOffer;
29  import org.tikv.common.util.ConcreteBackOffer;
30  import org.tikv.common.util.HistogramUtils;
31  
32  public abstract class RetryPolicy<RespT> {
33    BackOffer backOffer = ConcreteBackOffer.newCopNextMaxBackOff();
34    public static final Histogram GRPC_SINGLE_REQUEST_LATENCY =
35        HistogramUtils.buildDuration()
36            .name("client_java_grpc_single_requests_latency")
37            .help("grpc request latency.")
38            .labelNames("type", "cluster")
39            .register();
40    public static final Histogram CALL_WITH_RETRY_DURATION =
41        HistogramUtils.buildDuration()
42            .name("client_java_call_with_retry_duration")
43            .help("callWithRetry duration.")
44            .labelNames("type", "cluster")
45            .register();
46    public static final Counter GRPC_REQUEST_RETRY_NUM =
47        Counter.build()
48            .name("client_java_grpc_requests_retry_num")
49            .help("grpc request retry num.")
50            .labelNames("type", "cluster")
51            .register();
52  
53    // handles PD and TiKV's error.
54    private final ErrorHandler<RespT> handler;
55  
56    private final ImmutableSet<Status.Code> unrecoverableStatus =
57        ImmutableSet.of(
58            Status.Code.ALREADY_EXISTS, Status.Code.PERMISSION_DENIED,
59            Status.Code.INVALID_ARGUMENT, Status.Code.NOT_FOUND,
60            Status.Code.UNIMPLEMENTED, Status.Code.OUT_OF_RANGE,
61            Status.Code.UNAUTHENTICATED, Status.Code.CANCELLED);
62  
63    RetryPolicy(ErrorHandler<RespT> handler) {
64      this.handler = handler;
65    }
66  
67    private void rethrowNotRecoverableException(Exception e) {
68      Status status = Status.fromThrowable(e);
69      if (unrecoverableStatus.contains(status.getCode())) {
70        throw new GrpcException(e);
71      }
72    }
73  
74    public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) {
75      String[] labels = new String[] {methodName, backOffer.getClusterId().toString()};
76      Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(labels).startTimer();
77      SlowLogSpan callWithRetrySlowLogSpan = backOffer.getSlowLog().start("callWithRetry");
78      callWithRetrySlowLogSpan.addProperty("method", methodName);
79      try {
80        while (true) {
81          RespT result = null;
82          try {
83            // add single request duration histogram
84            Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(labels).startTimer();
85            SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC");
86            slowLogSpan.addProperty("method", methodName);
87            try {
88              result = proc.call();
89            } finally {
90              slowLogSpan.end();
91              requestTimer.observeDuration();
92            }
93          } catch (Exception e) {
94            rethrowNotRecoverableException(e);
95            // Handle request call error
96            backOffer.checkTimeout();
97            boolean retry = handler.handleRequestError(backOffer, e);
98            if (retry) {
99              GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
100             continue;
101           } else {
102             return result;
103           }
104         }
105 
106         // Handle response error
107         if (handler != null) {
108           boolean retry = handler.handleResponseError(backOffer, result);
109           if (retry) {
110             GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
111             continue;
112           }
113         }
114         return result;
115       }
116     } finally {
117       callWithRetryTimer.observeDuration();
118       callWithRetrySlowLogSpan.end();
119     }
120   }
121 
122   public interface Builder<T> {
123     RetryPolicy<T> create(ErrorHandler<T> handler);
124   }
125 }