1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.service.failsafe;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicLong;
27 import java.util.concurrent.atomic.AtomicReference;
28 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics {
33 private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerMetricsImpl.class);
34
35 private final int windowInMS;
36 private final List<MetricsListener> listeners;
37 private final AtomicReference<SingleWindowMetrics> currentMetrics;
38
39 private final ScheduledExecutorService scheduler;
40 private static final int SCHEDULER_INITIAL_DELAY = 1000;
41 private static final int SCHEDULER_PERIOD = 1000;
42
43 public CircuitBreakerMetricsImpl(int windowInSeconds) {
44 this.windowInMS = windowInSeconds * 1000;
45 this.listeners = new ArrayList<>();
46 this.currentMetrics = new AtomicReference<>(new SingleWindowMetrics());
47
48 scheduler =
49 new ScheduledThreadPoolExecutor(
50 1,
51 new BasicThreadFactory.Builder()
52 .namingPattern("circuit-breaker-metrics-%d")
53 .daemon(true)
54 .build());
55
56 scheduler.scheduleAtFixedRate(
57 this::onReachCircuitWindow,
58 SCHEDULER_INITIAL_DELAY,
59 SCHEDULER_PERIOD,
60 TimeUnit.MILLISECONDS);
61 }
62
63 @Override
64 public void recordSuccess() {
65 currentMetrics.get().recordSuccess();
66 }
67
68 @Override
69 public void recordFailure() {
70 currentMetrics.get().recordFailure();
71 }
72
73 private void onReachCircuitWindow() {
74 SingleWindowMetrics singleWindowMetrics = currentMetrics.get();
75 if (System.currentTimeMillis() < singleWindowMetrics.getStartMS() + windowInMS) {
76 return;
77 }
78 if (!currentMetrics.compareAndSet(singleWindowMetrics, new SingleWindowMetrics())) {
79 return;
80 }
81 logger.debug("window timeout, reset SingleWindowMetrics");
82 HealthCounts healthCounts = singleWindowMetrics.getHealthCounts();
83 for (MetricsListener metricsListener : listeners) {
84 metricsListener.onNext(healthCounts);
85 }
86 }
87
88 @Override
89 public void addListener(MetricsListener metricsListener) {
90 listeners.add(metricsListener);
91 }
92
93 @Override
94 public void close() throws IOException {
95 scheduler.shutdown();
96 }
97
98
99 static class SingleWindowMetrics {
100 private final long startMS = System.currentTimeMillis();
101 private final AtomicLong totalCount = new AtomicLong(0);
102 private final AtomicLong errorCount = new AtomicLong(0);
103
104 public void recordSuccess() {
105 totalCount.incrementAndGet();
106 }
107
108 public void recordFailure() {
109 totalCount.incrementAndGet();
110
111 errorCount.incrementAndGet();
112 }
113
114 public HealthCounts getHealthCounts() {
115 return new HealthCounts(totalCount.get(), errorCount.get());
116 }
117
118 public long getStartMS() {
119 return startMS;
120 }
121 }
122 }