View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.service.failsafe;
19  
20  import io.prometheus.client.Counter;
21  import java.io.IOException;
22  import java.util.concurrent.atomic.AtomicLong;
23  import java.util.concurrent.atomic.AtomicReference;
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  import org.tikv.common.TiConfiguration;
27  
28  public class CircuitBreakerImpl implements CircuitBreaker {
29    private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerImpl.class);
30  
31    private static final Counter CIRCUIT_BREAKER_ATTEMPT_COUNTER =
32        Counter.build()
33            .name("client_java_circuit_breaker_attempt_counter")
34            .help("client circuit breaker attempt counter.")
35            .labelNames("type", "cluster")
36            .register();
37  
38    private final Long clusterId;
39    private final boolean enable;
40    private final int windowInSeconds;
41    private final int errorThresholdPercentage;
42    private final int requestVolumeThreshold;
43    private final int sleepWindowInSeconds;
44    private final int attemptRequestCount;
45  
46    private final AtomicLong circuitOpened = new AtomicLong(-1);
47    private final AtomicReference<Status> status = new AtomicReference<>(Status.CLOSED);
48    private final AtomicLong attemptCount = new AtomicLong(0);
49    private final AtomicLong attemptSuccessCount = new AtomicLong(0);
50  
51    private final CircuitBreakerMetrics metrics;
52  
53    public CircuitBreakerImpl(TiConfiguration conf, long clusterId) {
54      this(
55          conf.isCircuitBreakEnable(),
56          conf.getCircuitBreakAvailabilityWindowInSeconds(),
57          conf.getCircuitBreakAvailabilityErrorThresholdPercentage(),
58          conf.getCircuitBreakAvailabilityRequestVolumnThreshold(),
59          conf.getCircuitBreakSleepWindowInSeconds(),
60          conf.getCircuitBreakAttemptRequestCount(),
61          clusterId);
62    }
63  
64    public CircuitBreakerImpl(
65        boolean enable,
66        int windowInSeconds,
67        int errorThresholdPercentage,
68        int requestVolumeThreshold,
69        int sleepWindowInSeconds,
70        int attemptRequestCount,
71        long clusterId) {
72      this.enable = enable;
73      this.clusterId = clusterId;
74      this.windowInSeconds = windowInSeconds;
75      this.errorThresholdPercentage = errorThresholdPercentage;
76      this.requestVolumeThreshold = requestVolumeThreshold;
77      this.sleepWindowInSeconds = sleepWindowInSeconds;
78      this.attemptRequestCount = attemptRequestCount;
79      this.metrics =
80          enable ? new CircuitBreakerMetricsImpl(windowInSeconds) : new NoOpCircuitBreakerMetrics();
81      this.metrics.addListener(getMetricsListener());
82    }
83  
84    private MetricsListener getMetricsListener() {
85      return hc -> {
86        logger.debug("onNext " + hc.toString());
87        // check if we are past the requestVolumeThreshold
88        if (hc.getTotalRequests() < requestVolumeThreshold) {
89          // we are not past the minimum volume threshold for the stat window,
90          // so no change to circuit status.
91          // if it was CLOSED, it stays CLOSED
92          // if it was half-open, we need to wait for some successful command executions
93          // if it was open, we need to wait for sleep window to elapse
94        } else {
95          if (hc.getErrorPercentage() < errorThresholdPercentage) {
96            // we are not past the minimum error threshold for the stat window,
97            // so no change to circuit status.
98            // if it was CLOSED, it stays CLOSED
99            // if it was half-open, we need to wait for some successful command executions
100           // if it was open, we need to wait for sleep window to elapse
101         } else {
102           // our failure rate is too high, we need to set the state to OPEN
103           close2Open();
104         }
105       }
106     };
107   }
108 
109   @Override
110   public CircuitBreakerMetrics getMetrics() {
111     return metrics;
112   }
113 
114   @Override
115   public boolean allowRequest() {
116     if (!enable) {
117       return true;
118     }
119     return !isOpen();
120   }
121 
122   boolean isOpen() {
123     return circuitOpened.get() >= 0;
124   }
125 
126   Status getStatus() {
127     return status.get();
128   }
129 
130   @Override
131   public void recordAttemptSuccess() {
132     CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success", clusterId.toString()).inc();
133     if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) {
134       halfOpen2Close();
135     }
136   }
137 
138   @Override
139   public void recordAttemptFailure() {
140     CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure", clusterId.toString()).inc();
141     halfOpen2Open();
142   }
143 
144   @Override
145   public boolean attemptExecution() {
146     if (allowRequest()) {
147       return true;
148     } else {
149       if (isAfterSleepWindow()) {
150         // only the `attemptRequestCount` requests after sleep window should execute
151         // if all the executing commands succeed, the status will transition to CLOSED
152         // if some of the executing commands fail, the status will transition to OPEN
153         open2HalfOpen();
154         return attemptCount.incrementAndGet() <= attemptRequestCount;
155       } else {
156         return false;
157       }
158     }
159   }
160 
161   private boolean isAfterSleepWindow() {
162     final long circuitOpenTime = circuitOpened.get();
163     final long currentTime = System.currentTimeMillis();
164     final long sleepWindowTime = (long) sleepWindowInSeconds * 1000;
165     return currentTime >= circuitOpenTime + sleepWindowTime;
166   }
167 
168   private void close2Open() {
169     if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
170       // This thread wins the race to open the circuit
171       // it sets the start time for the sleep window
172       circuitOpened.set(System.currentTimeMillis());
173       logger.info("CLOSED => OPEN");
174     }
175   }
176 
177   private void halfOpen2Close() {
178     if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
179       // This thread wins the race to close the circuit
180       circuitOpened.set(-1L);
181       logger.info("HALF_OPEN => CLOSED");
182     }
183   }
184 
185   private void open2HalfOpen() {
186     if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
187       // This thread wins the race to half close the circuit
188       // it resets the attempt count
189       attemptCount.set(0);
190       attemptSuccessCount.set(0);
191       logger.info("OPEN => HALF_OPEN");
192     }
193   }
194 
195   private void halfOpen2Open() {
196     if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
197       // This thread wins the race to re-open the circuit
198       // it resets the start time for the sleep window
199       circuitOpened.set(System.currentTimeMillis());
200       logger.info("HALF_OPEN => OPEN");
201     }
202   }
203 
204   @Override
205   public void close() throws IOException {
206     metrics.close();
207   }
208 }