View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.service.failsafe;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.concurrent.ScheduledExecutorService;
24  import java.util.concurrent.ScheduledThreadPoolExecutor;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicLong;
27  import java.util.concurrent.atomic.AtomicReference;
28  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics {
33    private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerMetricsImpl.class);
34  
35    private final int windowInMS;
36    private final List<MetricsListener> listeners;
37    private final AtomicReference<SingleWindowMetrics> currentMetrics;
38  
39    private final ScheduledExecutorService scheduler;
40    private static final int SCHEDULER_INITIAL_DELAY = 1000;
41    private static final int SCHEDULER_PERIOD = 1000;
42  
43    public CircuitBreakerMetricsImpl(int windowInSeconds) {
44      this.windowInMS = windowInSeconds * 1000;
45      this.listeners = new ArrayList<>();
46      this.currentMetrics = new AtomicReference<>(new SingleWindowMetrics());
47  
48      scheduler =
49          new ScheduledThreadPoolExecutor(
50              1,
51              new BasicThreadFactory.Builder()
52                  .namingPattern("circuit-breaker-metrics-%d")
53                  .daemon(true)
54                  .build());
55  
56      scheduler.scheduleAtFixedRate(
57          this::onReachCircuitWindow,
58          SCHEDULER_INITIAL_DELAY,
59          SCHEDULER_PERIOD,
60          TimeUnit.MILLISECONDS);
61    }
62  
63    @Override
64    public void recordSuccess() {
65      currentMetrics.get().recordSuccess();
66    }
67  
68    @Override
69    public void recordFailure() {
70      currentMetrics.get().recordFailure();
71    }
72  
73    private void onReachCircuitWindow() {
74      SingleWindowMetrics singleWindowMetrics = currentMetrics.get();
75      if (System.currentTimeMillis() < singleWindowMetrics.getStartMS() + windowInMS) {
76        return;
77      }
78      if (!currentMetrics.compareAndSet(singleWindowMetrics, new SingleWindowMetrics())) {
79        return;
80      }
81      logger.debug("window timeout, reset SingleWindowMetrics");
82      HealthCounts healthCounts = singleWindowMetrics.getHealthCounts();
83      for (MetricsListener metricsListener : listeners) {
84        metricsListener.onNext(healthCounts);
85      }
86    }
87  
88    @Override
89    public void addListener(MetricsListener metricsListener) {
90      listeners.add(metricsListener);
91    }
92  
93    @Override
94    public void close() throws IOException {
95      scheduler.shutdown();
96    }
97  
98    /** Instead of using SingleWindowMetrics, it is better to use RollingWindowMetrics. */
99    static class SingleWindowMetrics {
100     private final long startMS = System.currentTimeMillis();
101     private final AtomicLong totalCount = new AtomicLong(0);
102     private final AtomicLong errorCount = new AtomicLong(0);
103 
104     public void recordSuccess() {
105       totalCount.incrementAndGet();
106     }
107 
108     public void recordFailure() {
109       totalCount.incrementAndGet();
110 
111       errorCount.incrementAndGet();
112     }
113 
114     public HealthCounts getHealthCounts() {
115       return new HealthCounts(totalCount.get(), errorCount.get());
116     }
117 
118     public long getStartMS() {
119       return startMS;
120     }
121   }
122 }