1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.service.failsafe;
19
20 import io.prometheus.client.Counter;
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicLong;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import org.tikv.common.TiConfiguration;
27
28 public class CircuitBreakerImpl implements CircuitBreaker {
29 private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerImpl.class);
30
31 private static final Counter CIRCUIT_BREAKER_ATTEMPT_COUNTER =
32 Counter.build()
33 .name("client_java_circuit_breaker_attempt_counter")
34 .help("client circuit breaker attempt counter.")
35 .labelNames("type", "cluster")
36 .register();
37
38 private final Long clusterId;
39 private final boolean enable;
40 private final int windowInSeconds;
41 private final int errorThresholdPercentage;
42 private final int requestVolumeThreshold;
43 private final int sleepWindowInSeconds;
44 private final int attemptRequestCount;
45
46 private final AtomicLong circuitOpened = new AtomicLong(-1);
47 private final AtomicReference<Status> status = new AtomicReference<>(Status.CLOSED);
48 private final AtomicLong attemptCount = new AtomicLong(0);
49 private final AtomicLong attemptSuccessCount = new AtomicLong(0);
50
51 private final CircuitBreakerMetrics metrics;
52
53 public CircuitBreakerImpl(TiConfiguration conf, long clusterId) {
54 this(
55 conf.isCircuitBreakEnable(),
56 conf.getCircuitBreakAvailabilityWindowInSeconds(),
57 conf.getCircuitBreakAvailabilityErrorThresholdPercentage(),
58 conf.getCircuitBreakAvailabilityRequestVolumnThreshold(),
59 conf.getCircuitBreakSleepWindowInSeconds(),
60 conf.getCircuitBreakAttemptRequestCount(),
61 clusterId);
62 }
63
64 public CircuitBreakerImpl(
65 boolean enable,
66 int windowInSeconds,
67 int errorThresholdPercentage,
68 int requestVolumeThreshold,
69 int sleepWindowInSeconds,
70 int attemptRequestCount,
71 long clusterId) {
72 this.enable = enable;
73 this.clusterId = clusterId;
74 this.windowInSeconds = windowInSeconds;
75 this.errorThresholdPercentage = errorThresholdPercentage;
76 this.requestVolumeThreshold = requestVolumeThreshold;
77 this.sleepWindowInSeconds = sleepWindowInSeconds;
78 this.attemptRequestCount = attemptRequestCount;
79 this.metrics =
80 enable ? new CircuitBreakerMetricsImpl(windowInSeconds) : new NoOpCircuitBreakerMetrics();
81 this.metrics.addListener(getMetricsListener());
82 }
83
84 private MetricsListener getMetricsListener() {
85 return hc -> {
86 logger.debug("onNext " + hc.toString());
87
88 if (hc.getTotalRequests() < requestVolumeThreshold) {
89
90
91
92
93
94 } else {
95 if (hc.getErrorPercentage() < errorThresholdPercentage) {
96
97
98
99
100
101 } else {
102
103 close2Open();
104 }
105 }
106 };
107 }
108
109 @Override
110 public CircuitBreakerMetrics getMetrics() {
111 return metrics;
112 }
113
114 @Override
115 public boolean allowRequest() {
116 if (!enable) {
117 return true;
118 }
119 return !isOpen();
120 }
121
122 boolean isOpen() {
123 return circuitOpened.get() >= 0;
124 }
125
126 Status getStatus() {
127 return status.get();
128 }
129
130 @Override
131 public void recordAttemptSuccess() {
132 CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success", clusterId.toString()).inc();
133 if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) {
134 halfOpen2Close();
135 }
136 }
137
138 @Override
139 public void recordAttemptFailure() {
140 CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure", clusterId.toString()).inc();
141 halfOpen2Open();
142 }
143
144 @Override
145 public boolean attemptExecution() {
146 if (allowRequest()) {
147 return true;
148 } else {
149 if (isAfterSleepWindow()) {
150
151
152
153 open2HalfOpen();
154 return attemptCount.incrementAndGet() <= attemptRequestCount;
155 } else {
156 return false;
157 }
158 }
159 }
160
161 private boolean isAfterSleepWindow() {
162 final long circuitOpenTime = circuitOpened.get();
163 final long currentTime = System.currentTimeMillis();
164 final long sleepWindowTime = (long) sleepWindowInSeconds * 1000;
165 return currentTime >= circuitOpenTime + sleepWindowTime;
166 }
167
168 private void close2Open() {
169 if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
170
171
172 circuitOpened.set(System.currentTimeMillis());
173 logger.info("CLOSED => OPEN");
174 }
175 }
176
177 private void halfOpen2Close() {
178 if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
179
180 circuitOpened.set(-1L);
181 logger.info("HALF_OPEN => CLOSED");
182 }
183 }
184
185 private void open2HalfOpen() {
186 if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
187
188
189 attemptCount.set(0);
190 attemptSuccessCount.set(0);
191 logger.info("OPEN => HALF_OPEN");
192 }
193 }
194
195 private void halfOpen2Open() {
196 if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
197
198
199 circuitOpened.set(System.currentTimeMillis());
200 logger.info("HALF_OPEN => OPEN");
201 }
202 }
203
204 @Override
205 public void close() throws IOException {
206 metrics.close();
207 }
208 }