1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.raw;
19
20 import com.google.protobuf.ByteString;
21 import io.prometheus.client.Counter;
22 import io.prometheus.client.Histogram;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.tikv.common.TiSession;
29 import org.tikv.common.exception.CircuitBreakerOpenException;
30 import org.tikv.common.util.HistogramUtils;
31 import org.tikv.common.util.Pair;
32 import org.tikv.common.util.ScanOption;
33 import org.tikv.kvproto.Kvrpcpb;
34 import org.tikv.service.failsafe.CircuitBreaker;
35
36 public class SmartRawKVClient implements RawKVClientBase {
37 private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class);
38
39 private static final Histogram REQUEST_LATENCY =
40 HistogramUtils.buildDuration()
41 .name("client_java_smart_raw_requests_latency")
42 .help("client smart raw request latency.")
43 .labelNames("type", "cluster")
44 .register();
45
46 private static final Counter REQUEST_SUCCESS =
47 Counter.build()
48 .name("client_java_smart_raw_requests_success")
49 .help("client smart raw request success.")
50 .labelNames("type", "cluster")
51 .register();
52
53 private static final Counter REQUEST_FAILURE =
54 Counter.build()
55 .name("client_java_smart_raw_requests_failure")
56 .help("client smart raw request failure.")
57 .labelNames("type", "cluster")
58 .register();
59
60 private static final Counter CIRCUIT_BREAKER_OPENED =
61 Counter.build()
62 .name("client_java_smart_raw_circuit_breaker_opened")
63 .help("client smart raw circuit breaker opened.")
64 .labelNames("type", "cluster")
65 .register();
66
67 private final RawKVClientBase client;
68 private final CircuitBreaker circuitBreaker;
69
70 public SmartRawKVClient(RawKVClientBase client, CircuitBreaker breaker) {
71 this.client = client;
72 this.circuitBreaker = breaker;
73 }
74
75 @Override
76 public void put(ByteString key, ByteString value) {
77 callWithCircuitBreaker("put", () -> client.put(key, value));
78 }
79
80 @Override
81 public void put(ByteString key, ByteString value, long ttl) {
82 callWithCircuitBreaker("put", () -> client.put(key, value, ttl));
83 }
84
85 @Override
86 public Optional<ByteString> putIfAbsent(ByteString key, ByteString value) {
87 return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value));
88 }
89
90 @Override
91 public Optional<ByteString> putIfAbsent(ByteString key, ByteString value, long ttl) {
92 return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value, ttl));
93 }
94
95 @Override
96 public void compareAndSet(ByteString key, Optional<ByteString> prevValue, ByteString value) {
97 callWithCircuitBreaker("compareAndSet", () -> client.compareAndSet(key, prevValue, value));
98 }
99
100 @Override
101 public void compareAndSet(
102 ByteString key, Optional<ByteString> prevValue, ByteString value, long ttl) {
103 callWithCircuitBreaker("compareAndSet", () -> client.compareAndSet(key, prevValue, value, ttl));
104 }
105
106 @Override
107 public void batchPut(Map<ByteString, ByteString> kvPairs) {
108 callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs));
109 }
110
111 @Override
112 public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
113 callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs, ttl));
114 }
115
116 @Override
117 public Optional<ByteString> get(ByteString key) {
118 return callWithCircuitBreaker("get", () -> client.get(key));
119 }
120
121 @Override
122 public List<Kvrpcpb.KvPair> batchGet(List<ByteString> keys) {
123 return callWithCircuitBreaker("batchGet", () -> client.batchGet(keys));
124 }
125
126 @Override
127 public void batchDelete(List<ByteString> keys) {
128 callWithCircuitBreaker("batchDelete", () -> client.batchDelete(keys));
129 }
130
131 @Override
132 public Optional<Long> getKeyTTL(ByteString key) {
133 return callWithCircuitBreaker("getKeyTTL", () -> client.getKeyTTL(key));
134 }
135
136 @Override
137 public List<List<ByteString>> batchScanKeys(
138 List<Pair<ByteString, ByteString>> ranges, int eachLimit) {
139 return callWithCircuitBreaker("batchScanKeys", () -> client.batchScanKeys(ranges, eachLimit));
140 }
141
142 @Override
143 public List<List<Kvrpcpb.KvPair>> batchScan(List<ScanOption> ranges) {
144 return callWithCircuitBreaker("batchScan", () -> client.batchScan(ranges));
145 }
146
147 @Override
148 public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
149 return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit));
150 }
151
152 @Override
153 public List<Kvrpcpb.KvPair> scan(
154 ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
155 return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit, keyOnly));
156 }
157
158 @Override
159 public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit) {
160 return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit));
161 }
162
163 @Override
164 public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit, boolean keyOnly) {
165 return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit, keyOnly));
166 }
167
168 @Override
169 public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey) {
170 return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey));
171 }
172
173 @Override
174 public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
175 return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, keyOnly));
176 }
177
178 @Override
179 public List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) {
180 return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, limit, keyOnly));
181 }
182
183 @Override
184 public List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey) {
185 return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey));
186 }
187
188 @Override
189 public List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly) {
190 return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, keyOnly));
191 }
192
193 @Override
194 public void delete(ByteString key) {
195 callWithCircuitBreaker("delete", () -> client.delete(key));
196 }
197
198 @Override
199 public void deleteRange(ByteString startKey, ByteString endKey) {
200 callWithCircuitBreaker("deleteRange", () -> client.deleteRange(startKey, endKey));
201 }
202
203 @Override
204 public void deletePrefix(ByteString key) {
205 callWithCircuitBreaker("deletePrefix", () -> client.deletePrefix(key));
206 }
207
208 @Override
209 public TiSession getSession() {
210 return client.getSession();
211 }
212
213 <T> T callWithCircuitBreaker(String funcName, Function1<T> func) {
214 String[] labels =
215 new String[] {funcName, client.getSession().getPDClient().getClusterId().toString()};
216
217 Histogram.Timer requestTimer = REQUEST_LATENCY.labels(labels).startTimer();
218 try {
219 T result = callWithCircuitBreaker0(funcName, func);
220 REQUEST_SUCCESS.labels(labels).inc();
221 return result;
222 } catch (Exception e) {
223 REQUEST_FAILURE.labels(labels).inc();
224 throw e;
225 } finally {
226 requestTimer.observeDuration();
227 }
228 }
229
230 private <T> T callWithCircuitBreaker0(String funcName, Function1<T> func) {
231 if (circuitBreaker.allowRequest()) {
232 try {
233 T result = func.apply();
234 circuitBreaker.getMetrics().recordSuccess();
235 return result;
236 } catch (Exception e) {
237 circuitBreaker.getMetrics().recordFailure();
238 throw e;
239 }
240 } else if (circuitBreaker.attemptExecution()) {
241 logger.debug("attemptExecution");
242 try {
243 T result = func.apply();
244 circuitBreaker.getMetrics().recordSuccess();
245 circuitBreaker.recordAttemptSuccess();
246 logger.debug("markSuccess");
247 return result;
248 } catch (Exception e) {
249 circuitBreaker.getMetrics().recordFailure();
250 circuitBreaker.recordAttemptFailure();
251 logger.debug("markNonSuccess");
252 throw e;
253 }
254 } else {
255 logger.debug("Circuit Breaker Opened");
256 CIRCUIT_BREAKER_OPENED
257 .labels(funcName, client.getSession().getPDClient().getClusterId().toString())
258 .inc();
259 throw new CircuitBreakerOpenException();
260 }
261 }
262
263 private void callWithCircuitBreaker(String funcName, Function0 func) {
264 callWithCircuitBreaker(
265 funcName,
266 (Function1<Void>)
267 () -> {
268 func.apply();
269 return null;
270 });
271 }
272
273 @Override
274 public void close() throws Exception {
275 client.close();
276 }
277
278 public interface Function1<T> {
279 T apply();
280 }
281
282 public interface Function0 {
283 void apply();
284 }
285 }