View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.raw;
19  
20  import com.google.protobuf.ByteString;
21  import io.prometheus.client.Counter;
22  import io.prometheus.client.Histogram;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Optional;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  import org.tikv.common.TiSession;
29  import org.tikv.common.exception.CircuitBreakerOpenException;
30  import org.tikv.common.util.HistogramUtils;
31  import org.tikv.common.util.Pair;
32  import org.tikv.common.util.ScanOption;
33  import org.tikv.kvproto.Kvrpcpb;
34  import org.tikv.service.failsafe.CircuitBreaker;
35  
36  public class SmartRawKVClient implements RawKVClientBase {
37    private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class);
38  
39    private static final Histogram REQUEST_LATENCY =
40        HistogramUtils.buildDuration()
41            .name("client_java_smart_raw_requests_latency")
42            .help("client smart raw request latency.")
43            .labelNames("type", "cluster")
44            .register();
45  
46    private static final Counter REQUEST_SUCCESS =
47        Counter.build()
48            .name("client_java_smart_raw_requests_success")
49            .help("client smart raw request success.")
50            .labelNames("type", "cluster")
51            .register();
52  
53    private static final Counter REQUEST_FAILURE =
54        Counter.build()
55            .name("client_java_smart_raw_requests_failure")
56            .help("client smart raw request failure.")
57            .labelNames("type", "cluster")
58            .register();
59  
60    private static final Counter CIRCUIT_BREAKER_OPENED =
61        Counter.build()
62            .name("client_java_smart_raw_circuit_breaker_opened")
63            .help("client smart raw circuit breaker opened.")
64            .labelNames("type", "cluster")
65            .register();
66  
67    private final RawKVClientBase client;
68    private final CircuitBreaker circuitBreaker;
69  
70    public SmartRawKVClient(RawKVClientBase client, CircuitBreaker breaker) {
71      this.client = client;
72      this.circuitBreaker = breaker;
73    }
74  
75    @Override
76    public void put(ByteString key, ByteString value) {
77      callWithCircuitBreaker("put", () -> client.put(key, value));
78    }
79  
80    @Override
81    public void put(ByteString key, ByteString value, long ttl) {
82      callWithCircuitBreaker("put", () -> client.put(key, value, ttl));
83    }
84  
85    @Override
86    public Optional<ByteString> putIfAbsent(ByteString key, ByteString value) {
87      return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value));
88    }
89  
90    @Override
91    public Optional<ByteString> putIfAbsent(ByteString key, ByteString value, long ttl) {
92      return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value, ttl));
93    }
94  
95    @Override
96    public void compareAndSet(ByteString key, Optional<ByteString> prevValue, ByteString value) {
97      callWithCircuitBreaker("compareAndSet", () -> client.compareAndSet(key, prevValue, value));
98    }
99  
100   @Override
101   public void compareAndSet(
102       ByteString key, Optional<ByteString> prevValue, ByteString value, long ttl) {
103     callWithCircuitBreaker("compareAndSet", () -> client.compareAndSet(key, prevValue, value, ttl));
104   }
105 
106   @Override
107   public void batchPut(Map<ByteString, ByteString> kvPairs) {
108     callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs));
109   }
110 
111   @Override
112   public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
113     callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs, ttl));
114   }
115 
116   @Override
117   public Optional<ByteString> get(ByteString key) {
118     return callWithCircuitBreaker("get", () -> client.get(key));
119   }
120 
121   @Override
122   public List<Kvrpcpb.KvPair> batchGet(List<ByteString> keys) {
123     return callWithCircuitBreaker("batchGet", () -> client.batchGet(keys));
124   }
125 
126   @Override
127   public void batchDelete(List<ByteString> keys) {
128     callWithCircuitBreaker("batchDelete", () -> client.batchDelete(keys));
129   }
130 
131   @Override
132   public Optional<Long> getKeyTTL(ByteString key) {
133     return callWithCircuitBreaker("getKeyTTL", () -> client.getKeyTTL(key));
134   }
135 
136   @Override
137   public List<List<ByteString>> batchScanKeys(
138       List<Pair<ByteString, ByteString>> ranges, int eachLimit) {
139     return callWithCircuitBreaker("batchScanKeys", () -> client.batchScanKeys(ranges, eachLimit));
140   }
141 
142   @Override
143   public List<List<Kvrpcpb.KvPair>> batchScan(List<ScanOption> ranges) {
144     return callWithCircuitBreaker("batchScan", () -> client.batchScan(ranges));
145   }
146 
147   @Override
148   public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
149     return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit));
150   }
151 
152   @Override
153   public List<Kvrpcpb.KvPair> scan(
154       ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
155     return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit, keyOnly));
156   }
157 
158   @Override
159   public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit) {
160     return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit));
161   }
162 
163   @Override
164   public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit, boolean keyOnly) {
165     return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit, keyOnly));
166   }
167 
168   @Override
169   public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey) {
170     return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey));
171   }
172 
173   @Override
174   public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
175     return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, keyOnly));
176   }
177 
178   @Override
179   public List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) {
180     return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, limit, keyOnly));
181   }
182 
183   @Override
184   public List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey) {
185     return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey));
186   }
187 
188   @Override
189   public List<Kvrpcpb.KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly) {
190     return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, keyOnly));
191   }
192 
193   @Override
194   public void delete(ByteString key) {
195     callWithCircuitBreaker("delete", () -> client.delete(key));
196   }
197 
198   @Override
199   public void deleteRange(ByteString startKey, ByteString endKey) {
200     callWithCircuitBreaker("deleteRange", () -> client.deleteRange(startKey, endKey));
201   }
202 
203   @Override
204   public void deletePrefix(ByteString key) {
205     callWithCircuitBreaker("deletePrefix", () -> client.deletePrefix(key));
206   }
207 
208   @Override
209   public TiSession getSession() {
210     return client.getSession();
211   }
212 
213   <T> T callWithCircuitBreaker(String funcName, Function1<T> func) {
214     String[] labels =
215         new String[] {funcName, client.getSession().getPDClient().getClusterId().toString()};
216 
217     Histogram.Timer requestTimer = REQUEST_LATENCY.labels(labels).startTimer();
218     try {
219       T result = callWithCircuitBreaker0(funcName, func);
220       REQUEST_SUCCESS.labels(labels).inc();
221       return result;
222     } catch (Exception e) {
223       REQUEST_FAILURE.labels(labels).inc();
224       throw e;
225     } finally {
226       requestTimer.observeDuration();
227     }
228   }
229 
230   private <T> T callWithCircuitBreaker0(String funcName, Function1<T> func) {
231     if (circuitBreaker.allowRequest()) {
232       try {
233         T result = func.apply();
234         circuitBreaker.getMetrics().recordSuccess();
235         return result;
236       } catch (Exception e) {
237         circuitBreaker.getMetrics().recordFailure();
238         throw e;
239       }
240     } else if (circuitBreaker.attemptExecution()) {
241       logger.debug("attemptExecution");
242       try {
243         T result = func.apply();
244         circuitBreaker.getMetrics().recordSuccess();
245         circuitBreaker.recordAttemptSuccess();
246         logger.debug("markSuccess");
247         return result;
248       } catch (Exception e) {
249         circuitBreaker.getMetrics().recordFailure();
250         circuitBreaker.recordAttemptFailure();
251         logger.debug("markNonSuccess");
252         throw e;
253       }
254     } else {
255       logger.debug("Circuit Breaker Opened");
256       CIRCUIT_BREAKER_OPENED
257           .labels(funcName, client.getSession().getPDClient().getClusterId().toString())
258           .inc();
259       throw new CircuitBreakerOpenException();
260     }
261   }
262 
263   private void callWithCircuitBreaker(String funcName, Function0 func) {
264     callWithCircuitBreaker(
265         funcName,
266         (Function1<Void>)
267             () -> {
268               func.apply();
269               return null;
270             });
271   }
272 
273   @Override
274   public void close() throws Exception {
275     client.close();
276   }
277 
278   public interface Function1<T> {
279     T apply();
280   }
281 
282   public interface Function0 {
283     void apply();
284   }
285 }