View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.util;
19  
20  import static org.tikv.common.ConfigUtils.TIKV_BO_REGION_MISS_BASE_IN_MS;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.base.Preconditions;
24  import io.prometheus.client.Histogram;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ConcurrentHashMap;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  import org.tikv.common.TiConfiguration;
33  import org.tikv.common.exception.GrpcException;
34  import org.tikv.common.exception.TiKVException;
35  import org.tikv.common.log.SlowLog;
36  import org.tikv.common.log.SlowLogEmptyImpl;
37  import org.tikv.common.log.SlowLogSpan;
38  
39  public class ConcreteBackOffer implements BackOffer {
40    private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class);
41    private final int maxSleep;
42    private final Long clusterId;
43  
44    @VisibleForTesting
45    public final Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap;
46  
47    @VisibleForTesting public final List<Exception> errors;
48    private int totalSleep;
49    private final long deadline;
50    private final SlowLog slowLog;
51  
52    public static final Histogram BACKOFF_DURATION =
53        HistogramUtils.buildDuration()
54            .name("client_java_backoff_duration")
55            .help("backoff duration.")
56            .labelNames("type", "cluster")
57            .register();
58  
59    private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog, long clusterId) {
60      Preconditions.checkArgument(
61          maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0.");
62      Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0.");
63      Preconditions.checkArgument(deadline >= 0, "Deadline cannot be less than 0.");
64      this.clusterId = clusterId;
65      this.maxSleep = maxSleep;
66      this.errors = Collections.synchronizedList(new ArrayList<>());
67      this.backOffFunctionMap = new ConcurrentHashMap<>();
68      this.deadline = deadline;
69      this.slowLog = slowLog;
70    }
71  
72    private ConcreteBackOffer(ConcreteBackOffer source) {
73      this.clusterId = source.clusterId;
74      this.maxSleep = source.maxSleep;
75      this.totalSleep = source.totalSleep;
76      this.errors = source.errors;
77      this.backOffFunctionMap = source.backOffFunctionMap;
78      this.deadline = source.deadline;
79      this.slowLog = source.slowLog;
80    }
81  
82    public static ConcreteBackOffer newDeadlineBackOff(
83        int timeoutInMs, SlowLog slowLog, long clusterId) {
84      long deadline = System.currentTimeMillis() + timeoutInMs;
85      return new ConcreteBackOffer(0, deadline, slowLog, clusterId);
86    }
87  
88    public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) {
89      return newDeadlineBackOff(timeoutInMs, slowLog, 0);
90    }
91  
92    public static ConcreteBackOffer newCustomBackOff(int maxSleep, long clusterId) {
93      return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
94    }
95  
96    public static ConcreteBackOffer newCustomBackOff(int maxSleep) {
97      return newCustomBackOff(maxSleep, 0);
98    }
99  
100   public static ConcreteBackOffer newScannerNextMaxBackOff() {
101     return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, 0);
102   }
103 
104   public static ConcreteBackOffer newBatchGetMaxBackOff() {
105     return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, 0);
106   }
107 
108   public static ConcreteBackOffer newCopNextMaxBackOff() {
109     return newCopNextMaxBackOff(0);
110   }
111 
112   public static ConcreteBackOffer newCopNextMaxBackOff(long clusterId) {
113     return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
114   }
115 
116   public static ConcreteBackOffer newGetBackOff(long clusterId) {
117     return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
118   }
119 
120   public static ConcreteBackOffer newRawKVBackOff(long clusterId) {
121     return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
122   }
123 
124   public static ConcreteBackOffer newRawKVBackOff() {
125     return newRawKVBackOff(0);
126   }
127 
128   public static ConcreteBackOffer newTsoBackOff(long clusterId) {
129     return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
130   }
131 
132   public static ConcreteBackOffer create(BackOffer source) {
133     return new ConcreteBackOffer(((ConcreteBackOffer) source));
134   }
135 
136   /**
137    * Creates a back off func which implements exponential back off with optional jitters according
138    * to different back off strategies. See http://www.awsarchitectureblog.com/2015/03/backoff.html
139    */
140   private BackOffFunction createBackOffFunc(BackOffFunction.BackOffFuncType funcType) {
141     BackOffFunction backOffFunction = null;
142     switch (funcType) {
143       case BoUpdateLeader:
144         backOffFunction = BackOffFunction.create(1, 10, BackOffStrategy.NoJitter);
145         break;
146       case BoTxnLockFast:
147         backOffFunction = BackOffFunction.create(100, 3000, BackOffStrategy.EqualJitter);
148         break;
149       case BoServerBusy:
150         backOffFunction = BackOffFunction.create(2000, 10000, BackOffStrategy.EqualJitter);
151         break;
152       case BoRegionMiss:
153         backOffFunction =
154             BackOffFunction.create(
155                 TiConfiguration.getInt(TIKV_BO_REGION_MISS_BASE_IN_MS),
156                 500,
157                 BackOffStrategy.NoJitter);
158         break;
159       case BoTxnLock:
160         backOffFunction = BackOffFunction.create(200, 3000, BackOffStrategy.EqualJitter);
161         break;
162       case BoPDRPC:
163         backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
164         break;
165       case BoTiKVRPC:
166         backOffFunction = BackOffFunction.create(10, 400, BackOffStrategy.EqualJitter);
167         break;
168       case BoTxnNotFound:
169         backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter);
170         break;
171       case BoCheckTimeout:
172         backOffFunction = BackOffFunction.create(0, 0, BackOffStrategy.NoJitter);
173         break;
174       case BoCheckHealth:
175         backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
176         break;
177       case BoTsoBatchUsedUp:
178         backOffFunction =
179             BackOffFunction.create(
180                 TiConfiguration.getInt(TIKV_BO_REGION_MISS_BASE_IN_MS),
181                 500,
182                 BackOffStrategy.NoJitter);
183         break;
184     }
185     return backOffFunction;
186   }
187 
188   @Override
189   public void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err) {
190     doBackOffWithMaxSleep(funcType, -1, err);
191   }
192 
193   @Override
194   public void checkTimeout() {
195     if (!canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoCheckTimeout)) {
196       logThrowError(new TiKVException("Request Timeout"));
197     }
198   }
199 
200   @Override
201   public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) {
202     return canRetryAfterSleep(funcType, -1);
203   }
204 
205   public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
206     String[] labels = new String[] {funcType.name(), clusterId.toString()};
207     Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(labels).startTimer();
208     SlowLogSpan slowLogSpan = getSlowLog().start("backoff");
209     slowLogSpan.addProperty("type", funcType.name());
210     BackOffFunction backOffFunction =
211         backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);
212 
213     // Back off will not be done here
214     long sleep = backOffFunction.getSleepMs(maxSleepMs);
215     totalSleep += sleep;
216     // Check deadline
217     if (deadline > 0) {
218       long currentMs = System.currentTimeMillis();
219       if (currentMs + sleep >= deadline) {
220         logger.warn(String.format("Deadline %d is exceeded, errors:", deadline));
221         slowLogSpan.end();
222         backOffTimer.observeDuration();
223         return false;
224       }
225     }
226 
227     try {
228       Thread.sleep(sleep);
229     } catch (InterruptedException e) {
230       throw new GrpcException(e);
231     } finally {
232       slowLogSpan.end();
233       backOffTimer.observeDuration();
234     }
235     if (maxSleep > 0 && totalSleep >= maxSleep) {
236       logger.warn(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep));
237       return false;
238     }
239     return true;
240   }
241 
242   @Override
243   public void doBackOffWithMaxSleep(
244       BackOffFunction.BackOffFuncType funcType, long maxSleepMs, Exception err) {
245     logger.debug(
246         String.format(
247             "%s, retry later(totalSleep %dms, maxSleep %dms)",
248             err.getMessage(), totalSleep, maxSleep));
249     errors.add(err);
250     if (!canRetryAfterSleep(funcType, maxSleepMs)) {
251       logThrowError(err);
252     }
253   }
254 
255   private void logThrowError(Exception err) {
256     StringBuilder errMsg = new StringBuilder();
257     for (int i = 0; i < errors.size(); i++) {
258       Exception curErr = errors.get(i);
259       // Print only last 3 errors for non-DEBUG log levels.
260       if (logger.isDebugEnabled() || i >= errors.size() - 3) {
261         errMsg.append("\n").append(i).append(".").append(curErr.toString());
262       }
263     }
264     logger.warn(errMsg.toString());
265     // Use the last backoff type to generate an exception
266     throw new GrpcException("retry is exhausted.", err);
267   }
268 
269   @Override
270   public SlowLog getSlowLog() {
271     return slowLog;
272   }
273 
274   public Long getClusterId() {
275     return clusterId;
276   }
277 }