1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.util;
19
20 import static org.tikv.common.ConfigUtils.TIKV_BO_REGION_MISS_BASE_IN_MS;
21
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import io.prometheus.client.Histogram;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.ConcurrentHashMap;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.tikv.common.TiConfiguration;
33 import org.tikv.common.exception.GrpcException;
34 import org.tikv.common.exception.TiKVException;
35 import org.tikv.common.log.SlowLog;
36 import org.tikv.common.log.SlowLogEmptyImpl;
37 import org.tikv.common.log.SlowLogSpan;
38
39 public class ConcreteBackOffer implements BackOffer {
40 private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class);
41 private final int maxSleep;
42 private final Long clusterId;
43
44 @VisibleForTesting
45 public final Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap;
46
47 @VisibleForTesting public final List<Exception> errors;
48 private int totalSleep;
49 private final long deadline;
50 private final SlowLog slowLog;
51
52 public static final Histogram BACKOFF_DURATION =
53 HistogramUtils.buildDuration()
54 .name("client_java_backoff_duration")
55 .help("backoff duration.")
56 .labelNames("type", "cluster")
57 .register();
58
59 private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog, long clusterId) {
60 Preconditions.checkArgument(
61 maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0.");
62 Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0.");
63 Preconditions.checkArgument(deadline >= 0, "Deadline cannot be less than 0.");
64 this.clusterId = clusterId;
65 this.maxSleep = maxSleep;
66 this.errors = Collections.synchronizedList(new ArrayList<>());
67 this.backOffFunctionMap = new ConcurrentHashMap<>();
68 this.deadline = deadline;
69 this.slowLog = slowLog;
70 }
71
72 private ConcreteBackOffer(ConcreteBackOffer source) {
73 this.clusterId = source.clusterId;
74 this.maxSleep = source.maxSleep;
75 this.totalSleep = source.totalSleep;
76 this.errors = source.errors;
77 this.backOffFunctionMap = source.backOffFunctionMap;
78 this.deadline = source.deadline;
79 this.slowLog = source.slowLog;
80 }
81
82 public static ConcreteBackOffer newDeadlineBackOff(
83 int timeoutInMs, SlowLog slowLog, long clusterId) {
84 long deadline = System.currentTimeMillis() + timeoutInMs;
85 return new ConcreteBackOffer(0, deadline, slowLog, clusterId);
86 }
87
88 public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) {
89 return newDeadlineBackOff(timeoutInMs, slowLog, 0);
90 }
91
92 public static ConcreteBackOffer newCustomBackOff(int maxSleep, long clusterId) {
93 return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
94 }
95
96 public static ConcreteBackOffer newCustomBackOff(int maxSleep) {
97 return newCustomBackOff(maxSleep, 0);
98 }
99
100 public static ConcreteBackOffer newScannerNextMaxBackOff() {
101 return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, 0);
102 }
103
104 public static ConcreteBackOffer newBatchGetMaxBackOff() {
105 return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, 0);
106 }
107
108 public static ConcreteBackOffer newCopNextMaxBackOff() {
109 return newCopNextMaxBackOff(0);
110 }
111
112 public static ConcreteBackOffer newCopNextMaxBackOff(long clusterId) {
113 return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
114 }
115
116 public static ConcreteBackOffer newGetBackOff(long clusterId) {
117 return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
118 }
119
120 public static ConcreteBackOffer newRawKVBackOff(long clusterId) {
121 return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
122 }
123
124 public static ConcreteBackOffer newRawKVBackOff() {
125 return newRawKVBackOff(0);
126 }
127
128 public static ConcreteBackOffer newTsoBackOff(long clusterId) {
129 return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
130 }
131
132 public static ConcreteBackOffer create(BackOffer source) {
133 return new ConcreteBackOffer(((ConcreteBackOffer) source));
134 }
135
136
137
138
139
140 private BackOffFunction createBackOffFunc(BackOffFunction.BackOffFuncType funcType) {
141 BackOffFunction backOffFunction = null;
142 switch (funcType) {
143 case BoUpdateLeader:
144 backOffFunction = BackOffFunction.create(1, 10, BackOffStrategy.NoJitter);
145 break;
146 case BoTxnLockFast:
147 backOffFunction = BackOffFunction.create(100, 3000, BackOffStrategy.EqualJitter);
148 break;
149 case BoServerBusy:
150 backOffFunction = BackOffFunction.create(2000, 10000, BackOffStrategy.EqualJitter);
151 break;
152 case BoRegionMiss:
153 backOffFunction =
154 BackOffFunction.create(
155 TiConfiguration.getInt(TIKV_BO_REGION_MISS_BASE_IN_MS),
156 500,
157 BackOffStrategy.NoJitter);
158 break;
159 case BoTxnLock:
160 backOffFunction = BackOffFunction.create(200, 3000, BackOffStrategy.EqualJitter);
161 break;
162 case BoPDRPC:
163 backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
164 break;
165 case BoTiKVRPC:
166 backOffFunction = BackOffFunction.create(10, 400, BackOffStrategy.EqualJitter);
167 break;
168 case BoTxnNotFound:
169 backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter);
170 break;
171 case BoCheckTimeout:
172 backOffFunction = BackOffFunction.create(0, 0, BackOffStrategy.NoJitter);
173 break;
174 case BoCheckHealth:
175 backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
176 break;
177 case BoTsoBatchUsedUp:
178 backOffFunction =
179 BackOffFunction.create(
180 TiConfiguration.getInt(TIKV_BO_REGION_MISS_BASE_IN_MS),
181 500,
182 BackOffStrategy.NoJitter);
183 break;
184 }
185 return backOffFunction;
186 }
187
188 @Override
189 public void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err) {
190 doBackOffWithMaxSleep(funcType, -1, err);
191 }
192
193 @Override
194 public void checkTimeout() {
195 if (!canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoCheckTimeout)) {
196 logThrowError(new TiKVException("Request Timeout"));
197 }
198 }
199
200 @Override
201 public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) {
202 return canRetryAfterSleep(funcType, -1);
203 }
204
205 public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
206 String[] labels = new String[] {funcType.name(), clusterId.toString()};
207 Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(labels).startTimer();
208 SlowLogSpan slowLogSpan = getSlowLog().start("backoff");
209 slowLogSpan.addProperty("type", funcType.name());
210 BackOffFunction backOffFunction =
211 backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);
212
213
214 long sleep = backOffFunction.getSleepMs(maxSleepMs);
215 totalSleep += sleep;
216
217 if (deadline > 0) {
218 long currentMs = System.currentTimeMillis();
219 if (currentMs + sleep >= deadline) {
220 logger.warn(String.format("Deadline %d is exceeded, errors:", deadline));
221 slowLogSpan.end();
222 backOffTimer.observeDuration();
223 return false;
224 }
225 }
226
227 try {
228 Thread.sleep(sleep);
229 } catch (InterruptedException e) {
230 throw new GrpcException(e);
231 } finally {
232 slowLogSpan.end();
233 backOffTimer.observeDuration();
234 }
235 if (maxSleep > 0 && totalSleep >= maxSleep) {
236 logger.warn(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep));
237 return false;
238 }
239 return true;
240 }
241
242 @Override
243 public void doBackOffWithMaxSleep(
244 BackOffFunction.BackOffFuncType funcType, long maxSleepMs, Exception err) {
245 logger.debug(
246 String.format(
247 "%s, retry later(totalSleep %dms, maxSleep %dms)",
248 err.getMessage(), totalSleep, maxSleep));
249 errors.add(err);
250 if (!canRetryAfterSleep(funcType, maxSleepMs)) {
251 logThrowError(err);
252 }
253 }
254
255 private void logThrowError(Exception err) {
256 StringBuilder errMsg = new StringBuilder();
257 for (int i = 0; i < errors.size(); i++) {
258 Exception curErr = errors.get(i);
259
260 if (logger.isDebugEnabled() || i >= errors.size() - 3) {
261 errMsg.append("\n").append(i).append(".").append(curErr.toString());
262 }
263 }
264 logger.warn(errMsg.toString());
265
266 throw new GrpcException("retry is exhausted.", err);
267 }
268
269 @Override
270 public SlowLog getSlowLog() {
271 return slowLog;
272 }
273
274 public Long getClusterId() {
275 return clusterId;
276 }
277 }