1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.region;
19
20 import static org.tikv.common.codec.KeyUtils.formatBytesUTF8;
21
22 import com.google.protobuf.ByteString;
23 import io.prometheus.client.Histogram;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.function.Function;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.tikv.common.ReadOnlyPDClient;
36 import org.tikv.common.TiConfiguration;
37 import org.tikv.common.event.CacheInvalidateEvent;
38 import org.tikv.common.exception.GrpcException;
39 import org.tikv.common.exception.InvalidStoreException;
40 import org.tikv.common.exception.TiClientInternalException;
41 import org.tikv.common.log.SlowLogSpan;
42 import org.tikv.common.util.BackOffer;
43 import org.tikv.common.util.ChannelFactory;
44 import org.tikv.common.util.ConcreteBackOffer;
45 import org.tikv.common.util.HistogramUtils;
46 import org.tikv.common.util.Pair;
47 import org.tikv.kvproto.Metapb;
48 import org.tikv.kvproto.Metapb.Peer;
49 import org.tikv.kvproto.Metapb.StoreState;
50 import org.tikv.kvproto.Pdpb;
51
52 @SuppressWarnings("UnstableApiUsage")
53 public class RegionManager {
54
55 private static final Logger logger = LoggerFactory.getLogger(RegionManager.class);
56 public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY =
57 HistogramUtils.buildDuration()
58 .name("client_java_get_region_by_requests_latency")
59 .help("getRegionByKey request latency.")
60 .labelNames("cluster")
61 .register();
62 public static final Histogram SCAN_REGIONS_REQUEST_LATENCY =
63 HistogramUtils.buildDuration()
64 .name("client_java_scan_regions_request_latency")
65 .help("scanRegions request latency.")
66 .labelNames("cluster")
67 .register();
68
69
70
71 private final RegionCache cache;
72 private final ReadOnlyPDClient pdClient;
73 private final TiConfiguration conf;
74 private final ScheduledExecutorService executor;
75 private final StoreHealthyChecker storeChecker;
76 private final CopyOnWriteArrayList<Function<CacheInvalidateEvent, Void>>
77 cacheInvalidateCallbackList;
78 private final ExecutorService callBackThreadPool;
79 private AtomicInteger tiflashStoreIndex = new AtomicInteger(0);
80
81 public RegionManager(
82 TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) {
83 this(conf, pdClient, channelFactory, 1);
84 }
85
86 public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
87 this(conf, pdClient, 1);
88 }
89
90 public RegionManager(
91 TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) {
92 this.cache = new RegionCache();
93 this.pdClient = pdClient;
94 this.conf = conf;
95 this.storeChecker = null;
96 this.executor = null;
97 this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
98 this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
99 }
100
101 public RegionManager(
102 TiConfiguration conf,
103 ReadOnlyPDClient pdClient,
104 ChannelFactory channelFactory,
105 int callBackExecutorThreadNum) {
106 this.cache = new RegionCache();
107 this.pdClient = pdClient;
108 this.conf = conf;
109 long period = conf.getHealthCheckPeriodDuration();
110 StoreHealthyChecker storeChecker =
111 new StoreHealthyChecker(
112 channelFactory, pdClient, this.cache, conf.getGrpcHealthCheckTimeout());
113 this.storeChecker = storeChecker;
114 this.executor = Executors.newScheduledThreadPool(1);
115 this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS);
116 this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
117 this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
118 }
119
120 public synchronized void close() {
121 if (this.executor != null) {
122 this.executor.shutdownNow();
123 }
124 this.callBackThreadPool.shutdownNow();
125 }
126
127 public ReadOnlyPDClient getPDClient() {
128 return this.pdClient;
129 }
130
131 public ExecutorService getCallBackThreadPool() {
132 return callBackThreadPool;
133 }
134
135 public List<Function<CacheInvalidateEvent, Void>> getCacheInvalidateCallbackList() {
136 return cacheInvalidateCallbackList;
137 }
138
139 public void addCacheInvalidateCallback(
140 Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
141 this.cacheInvalidateCallbackList.add(cacheInvalidateCallback);
142 }
143
144 public void invalidateAll() {
145 cache.invalidateAll();
146 }
147
148 public List<Pdpb.Region> scanRegions(
149 BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
150 Long clusterId = pdClient.getClusterId();
151 Histogram.Timer requestTimer =
152 SCAN_REGIONS_REQUEST_LATENCY.labels(clusterId.toString()).startTimer();
153 SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions");
154 try {
155 return pdClient.scanRegions(backOffer, startKey, endKey, limit);
156 } catch (Exception e) {
157 return new ArrayList<>();
158 } finally {
159 requestTimer.observeDuration();
160 slowLogSpan.end();
161 }
162 }
163
164 public TiRegion getRegionByKey(ByteString key) {
165 return getRegionByKey(key, defaultBackOff());
166 }
167
168 public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
169 Long clusterId = pdClient.getClusterId();
170 Histogram.Timer requestTimer =
171 GET_REGION_BY_KEY_REQUEST_LATENCY.labels(clusterId.toString()).startTimer();
172 SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey");
173 TiRegion region = cache.getRegionByKey(key, backOffer);
174 try {
175 if (region == null) {
176 logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
177 Pair<Metapb.Region, Metapb.Peer> regionAndLeader = pdClient.getRegionByKey(backOffer, key);
178 region =
179 cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer));
180 logger.debug(
181 String.format(
182 "get region id: %d with leader: %d",
183 region.getId(), region.getLeader().getStoreId()));
184 }
185 } catch (Exception e) {
186 logger.warn("Get region failed: ", e);
187 return null;
188 } finally {
189 requestTimer.observeDuration();
190 slowLogSpan.end();
191 }
192
193 return region;
194 }
195
196 @Deprecated
197
198
199
200
201
202
203 public TiRegion getRegionById(long regionId) {
204 BackOffer backOffer = defaultBackOff();
205 TiRegion region = cache.getRegionById(regionId);
206 if (region == null) {
207 Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
208 pdClient.getRegionByID(backOffer, regionId);
209 region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
210 return cache.putRegion(region);
211 }
212 return region;
213 }
214
215 public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, BackOffer backOffer) {
216 return getRegionStorePairByKey(key, TiStoreType.TiKV, backOffer);
217 }
218
219 public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key) {
220 return getRegionStorePairByKey(key, TiStoreType.TiKV);
221 }
222
223 public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, TiStoreType storeType) {
224 return getRegionStorePairByKey(key, storeType, defaultBackOff());
225 }
226
227 public Pair<TiRegion, TiStore> getRegionStorePairByKey(
228 ByteString key, TiStoreType storeType, BackOffer backOffer) {
229 TiRegion region = getRegionByKey(key, backOffer);
230 if (region == null || !region.isValid()) {
231 throw new TiClientInternalException("Region invalid: " + region);
232 }
233
234 TiStore store = null;
235 if (storeType == TiStoreType.TiKV) {
236
237 List<Peer> replicaList = region.getReplicaList();
238 for (int i = 0; i < replicaList.size(); i++) {
239 Peer peer = replicaList.get(i);
240 store = getStoreById(peer.getStoreId(), backOffer);
241 if (store.isReachable()) {
242
243 region.setReplicaIdx(i);
244 break;
245 }
246 logger.info("Store {} is unreachable, try to get the next replica", peer.getStoreId());
247 }
248
249 if (store == null || !store.isReachable()) {
250 logger.warn("No TiKV store available for region: " + region);
251 }
252 } else {
253 List<TiStore> tiflashStores = new ArrayList<>();
254 for (Peer peer : region.getLearnerList()) {
255 TiStore s = getStoreById(peer.getStoreId(), backOffer);
256 if (!s.isReachable()) {
257 continue;
258 }
259 if (s.isTiFlash()) {
260 tiflashStores.add(s);
261 }
262 }
263
264 if (tiflashStores.size() > 0) {
265 store =
266 tiflashStores.get(
267 Math.floorMod(tiflashStoreIndex.getAndIncrement(), tiflashStores.size()));
268 }
269
270 if (store == null) {
271
272 cache.invalidateRegion(region);
273 }
274 }
275 return Pair.create(region, store);
276 }
277
278 public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) {
279 return createRegion(region, null, backOffer);
280 }
281
282 private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
283 List<Metapb.Peer> peers = new ArrayList<>();
284 List<TiStore> stores = new ArrayList<>();
285 for (Metapb.Peer peer : region.getPeersList()) {
286 try {
287 stores.add(getStoreById(peer.getStoreId(), backOffer));
288 peers.add(peer);
289 } catch (Exception e) {
290 logger.warn("Store {} not found: {}", peer.getStoreId(), e.toString());
291 }
292 }
293 Metapb.Region newRegion =
294 Metapb.Region.newBuilder().mergeFrom(region).clearPeers().addAllPeers(peers).build();
295 return new TiRegion(conf, newRegion, leader, peers, stores);
296 }
297
298 private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) {
299 try {
300 TiStore store = cache.getStoreById(id);
301 if (store == null) {
302 store = new TiStore(pdClient.getStore(backOffer, id));
303 } else {
304 return store;
305 }
306
307 if (store.getStore() == null) {
308 logger.warn(String.format("failed to get store %d from pd", id));
309 return null;
310 }
311
312 if (store.getStore().getState().equals(StoreState.Tombstone)) {
313 logger.warn(String.format("store %d is tombstone", id));
314 return null;
315 }
316 if (cache.putStore(id, store) && storeChecker != null) {
317 storeChecker.scheduleStoreHealthCheck(store);
318 }
319 return store;
320 } catch (Exception e) {
321 throw new GrpcException(e);
322 }
323 }
324
325 public TiStore getStoreById(long id) {
326 return getStoreById(id, defaultBackOff());
327 }
328
329 public TiStore getStoreById(long id, BackOffer backOffer) {
330 TiStore store = getStoreByIdWithBackOff(id, backOffer);
331 if (store == null) {
332 logger.warn(String.format("failed to fetch store %d, the store may be missing", id));
333 cache.clearAll();
334 throw new InvalidStoreException(id);
335 }
336 return store;
337 }
338
339 public void onRegionStale(TiRegion region) {
340 cache.invalidateRegion(region);
341 }
342
343 public TiRegion updateLeader(TiRegion region, long storeId) {
344 if (region.getLeader().getStoreId() == storeId) {
345 return region;
346 }
347 TiRegion newRegion = region.switchPeer(storeId);
348 if (cache.updateRegion(region, newRegion)) {
349 return newRegion;
350 }
351
352
353 logger.warn("Cannot find peer when updating leader (" + region.getId() + "," + storeId + ")");
354 return null;
355 }
356
357 public synchronized void updateStore(TiStore oldStore, TiStore newStore) {
358 if (cache.updateStore(oldStore, newStore) && storeChecker != null) {
359 storeChecker.scheduleStoreHealthCheck(newStore);
360 }
361 }
362
363
364 public void clearRegionCache() {
365 cache.clearAll();
366 }
367
368
369
370
371
372
373 public synchronized void onRequestFail(TiRegion region) {
374 cache.invalidateRegion(region);
375 }
376
377 public void invalidateStore(long storeId) {
378 cache.invalidateStore(storeId);
379 }
380
381 public void invalidateRegion(TiRegion region) {
382 cache.invalidateRegion(region);
383 }
384
385 public void insertRegionToCache(TiRegion region) {
386 cache.insertRegionToCache(region);
387 }
388
389 private BackOffer defaultBackOff() {
390 return ConcreteBackOffer.newCustomBackOff(
391 conf.getRawKVDefaultBackoffInMS(), pdClient.getClusterId());
392 }
393 }