View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.region;
19  
20  import static org.tikv.common.codec.KeyUtils.formatBytesUTF8;
21  
22  import com.google.protobuf.ByteString;
23  import io.prometheus.client.Histogram;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.CopyOnWriteArrayList;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.function.Function;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  import org.tikv.common.ReadOnlyPDClient;
36  import org.tikv.common.TiConfiguration;
37  import org.tikv.common.event.CacheInvalidateEvent;
38  import org.tikv.common.exception.GrpcException;
39  import org.tikv.common.exception.InvalidStoreException;
40  import org.tikv.common.exception.TiClientInternalException;
41  import org.tikv.common.log.SlowLogSpan;
42  import org.tikv.common.util.BackOffer;
43  import org.tikv.common.util.ChannelFactory;
44  import org.tikv.common.util.ConcreteBackOffer;
45  import org.tikv.common.util.HistogramUtils;
46  import org.tikv.common.util.Pair;
47  import org.tikv.kvproto.Metapb;
48  import org.tikv.kvproto.Metapb.Peer;
49  import org.tikv.kvproto.Metapb.StoreState;
50  import org.tikv.kvproto.Pdpb;
51  
52  @SuppressWarnings("UnstableApiUsage")
53  public class RegionManager {
54  
55    private static final Logger logger = LoggerFactory.getLogger(RegionManager.class);
56    public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY =
57        HistogramUtils.buildDuration()
58            .name("client_java_get_region_by_requests_latency")
59            .help("getRegionByKey request latency.")
60            .labelNames("cluster")
61            .register();
62    public static final Histogram SCAN_REGIONS_REQUEST_LATENCY =
63        HistogramUtils.buildDuration()
64            .name("client_java_scan_regions_request_latency")
65            .help("scanRegions request latency.")
66            .labelNames("cluster")
67            .register();
68  
69    // TODO: the region cache logic need rewrite.
70    // https://github.com/pingcap/tispark/issues/1170
71    private final RegionCache cache;
72    private final ReadOnlyPDClient pdClient;
73    private final TiConfiguration conf;
74    private final ScheduledExecutorService executor;
75    private final StoreHealthyChecker storeChecker;
76    private final CopyOnWriteArrayList<Function<CacheInvalidateEvent, Void>>
77        cacheInvalidateCallbackList;
78    private final ExecutorService callBackThreadPool;
79    private AtomicInteger tiflashStoreIndex = new AtomicInteger(0);
80  
81    public RegionManager(
82        TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) {
83      this(conf, pdClient, channelFactory, 1);
84    }
85  
86    public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
87      this(conf, pdClient, 1);
88    }
89  
90    public RegionManager(
91        TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) {
92      this.cache = new RegionCache();
93      this.pdClient = pdClient;
94      this.conf = conf;
95      this.storeChecker = null;
96      this.executor = null;
97      this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
98      this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
99    }
100 
101   public RegionManager(
102       TiConfiguration conf,
103       ReadOnlyPDClient pdClient,
104       ChannelFactory channelFactory,
105       int callBackExecutorThreadNum) {
106     this.cache = new RegionCache();
107     this.pdClient = pdClient;
108     this.conf = conf;
109     long period = conf.getHealthCheckPeriodDuration();
110     StoreHealthyChecker storeChecker =
111         new StoreHealthyChecker(
112             channelFactory, pdClient, this.cache, conf.getGrpcHealthCheckTimeout());
113     this.storeChecker = storeChecker;
114     this.executor = Executors.newScheduledThreadPool(1);
115     this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS);
116     this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
117     this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
118   }
119 
120   public synchronized void close() {
121     if (this.executor != null) {
122       this.executor.shutdownNow();
123     }
124     this.callBackThreadPool.shutdownNow();
125   }
126 
127   public ReadOnlyPDClient getPDClient() {
128     return this.pdClient;
129   }
130 
131   public ExecutorService getCallBackThreadPool() {
132     return callBackThreadPool;
133   }
134 
135   public List<Function<CacheInvalidateEvent, Void>> getCacheInvalidateCallbackList() {
136     return cacheInvalidateCallbackList;
137   }
138 
139   public void addCacheInvalidateCallback(
140       Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
141     this.cacheInvalidateCallbackList.add(cacheInvalidateCallback);
142   }
143 
144   public void invalidateAll() {
145     cache.invalidateAll();
146   }
147 
148   public List<Pdpb.Region> scanRegions(
149       BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
150     Long clusterId = pdClient.getClusterId();
151     Histogram.Timer requestTimer =
152         SCAN_REGIONS_REQUEST_LATENCY.labels(clusterId.toString()).startTimer();
153     SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions");
154     try {
155       return pdClient.scanRegions(backOffer, startKey, endKey, limit);
156     } catch (Exception e) {
157       return new ArrayList<>();
158     } finally {
159       requestTimer.observeDuration();
160       slowLogSpan.end();
161     }
162   }
163 
164   public TiRegion getRegionByKey(ByteString key) {
165     return getRegionByKey(key, defaultBackOff());
166   }
167 
168   public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
169     Long clusterId = pdClient.getClusterId();
170     Histogram.Timer requestTimer =
171         GET_REGION_BY_KEY_REQUEST_LATENCY.labels(clusterId.toString()).startTimer();
172     SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey");
173     TiRegion region = cache.getRegionByKey(key, backOffer);
174     try {
175       if (region == null) {
176         logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key));
177         Pair<Metapb.Region, Metapb.Peer> regionAndLeader = pdClient.getRegionByKey(backOffer, key);
178         region =
179             cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer));
180         logger.debug(
181             String.format(
182                 "get region id: %d with leader: %d",
183                 region.getId(), region.getLeader().getStoreId()));
184       }
185     } catch (Exception e) {
186       logger.warn("Get region failed: ", e);
187       return null;
188     } finally {
189       requestTimer.observeDuration();
190       slowLogSpan.end();
191     }
192 
193     return region;
194   }
195 
196   @Deprecated
197   // Do not use GetRegionByID when retrying request.
198   //
199   //   A,B |_______|_____|
200   //   A   |_____________|
201   // Consider region A, B. After merge of (A, B) -> A, region ID B does not exist.
202   // This request is unrecoverable.
203   public TiRegion getRegionById(long regionId) {
204     BackOffer backOffer = defaultBackOff();
205     TiRegion region = cache.getRegionById(regionId);
206     if (region == null) {
207       Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
208           pdClient.getRegionByID(backOffer, regionId);
209       region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
210       return cache.putRegion(region);
211     }
212     return region;
213   }
214 
215   public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, BackOffer backOffer) {
216     return getRegionStorePairByKey(key, TiStoreType.TiKV, backOffer);
217   }
218 
219   public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key) {
220     return getRegionStorePairByKey(key, TiStoreType.TiKV);
221   }
222 
223   public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, TiStoreType storeType) {
224     return getRegionStorePairByKey(key, storeType, defaultBackOff());
225   }
226 
227   public Pair<TiRegion, TiStore> getRegionStorePairByKey(
228       ByteString key, TiStoreType storeType, BackOffer backOffer) {
229     TiRegion region = getRegionByKey(key, backOffer);
230     if (region == null || !region.isValid()) {
231       throw new TiClientInternalException("Region invalid: " + region);
232     }
233 
234     TiStore store = null;
235     if (storeType == TiStoreType.TiKV) {
236       // check from the first replica in case it recovers
237       List<Peer> replicaList = region.getReplicaList();
238       for (int i = 0; i < replicaList.size(); i++) {
239         Peer peer = replicaList.get(i);
240         store = getStoreById(peer.getStoreId(), backOffer);
241         if (store.isReachable()) {
242           // update replica's index
243           region.setReplicaIdx(i);
244           break;
245         }
246         logger.info("Store {} is unreachable, try to get the next replica", peer.getStoreId());
247       }
248       // Does not set unreachable store to null in case it is incompatible with GrpcForward
249       if (store == null || !store.isReachable()) {
250         logger.warn("No TiKV store available for region: " + region);
251       }
252     } else {
253       List<TiStore> tiflashStores = new ArrayList<>();
254       for (Peer peer : region.getLearnerList()) {
255         TiStore s = getStoreById(peer.getStoreId(), backOffer);
256         if (!s.isReachable()) {
257           continue;
258         }
259         if (s.isTiFlash()) {
260           tiflashStores.add(s);
261         }
262       }
263       // select a tiflash with Round-Robin strategy
264       if (tiflashStores.size() > 0) {
265         store =
266             tiflashStores.get(
267                 Math.floorMod(tiflashStoreIndex.getAndIncrement(), tiflashStores.size()));
268       }
269 
270       if (store == null) {
271         // clear the region cache, so we may get the learner peer next time
272         cache.invalidateRegion(region);
273       }
274     }
275     return Pair.create(region, store);
276   }
277 
278   public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) {
279     return createRegion(region, null, backOffer);
280   }
281 
282   private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
283     List<Metapb.Peer> peers = new ArrayList<>();
284     List<TiStore> stores = new ArrayList<>();
285     for (Metapb.Peer peer : region.getPeersList()) {
286       try {
287         stores.add(getStoreById(peer.getStoreId(), backOffer));
288         peers.add(peer);
289       } catch (Exception e) {
290         logger.warn("Store {} not found: {}", peer.getStoreId(), e.toString());
291       }
292     }
293     Metapb.Region newRegion =
294         Metapb.Region.newBuilder().mergeFrom(region).clearPeers().addAllPeers(peers).build();
295     return new TiRegion(conf, newRegion, leader, peers, stores);
296   }
297 
298   private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) {
299     try {
300       TiStore store = cache.getStoreById(id);
301       if (store == null) {
302         store = new TiStore(pdClient.getStore(backOffer, id));
303       } else {
304         return store;
305       }
306       // if we did not get store info from pd, remove store from cache
307       if (store.getStore() == null) {
308         logger.warn(String.format("failed to get store %d from pd", id));
309         return null;
310       }
311       // if the store is already tombstone, remove store from cache
312       if (store.getStore().getState().equals(StoreState.Tombstone)) {
313         logger.warn(String.format("store %d is tombstone", id));
314         return null;
315       }
316       if (cache.putStore(id, store) && storeChecker != null) {
317         storeChecker.scheduleStoreHealthCheck(store);
318       }
319       return store;
320     } catch (Exception e) {
321       throw new GrpcException(e);
322     }
323   }
324 
325   public TiStore getStoreById(long id) {
326     return getStoreById(id, defaultBackOff());
327   }
328 
329   public TiStore getStoreById(long id, BackOffer backOffer) {
330     TiStore store = getStoreByIdWithBackOff(id, backOffer);
331     if (store == null) {
332       logger.warn(String.format("failed to fetch store %d, the store may be missing", id));
333       cache.clearAll();
334       throw new InvalidStoreException(id);
335     }
336     return store;
337   }
338 
339   public void onRegionStale(TiRegion region) {
340     cache.invalidateRegion(region);
341   }
342 
343   public TiRegion updateLeader(TiRegion region, long storeId) {
344     if (region.getLeader().getStoreId() == storeId) {
345       return region;
346     }
347     TiRegion newRegion = region.switchPeer(storeId);
348     if (cache.updateRegion(region, newRegion)) {
349       return newRegion;
350     }
351     // failed to switch leader, possibly region is outdated, we need to drop region cache from
352     // regionCache
353     logger.warn("Cannot find peer when updating leader (" + region.getId() + "," + storeId + ")");
354     return null;
355   }
356 
357   public synchronized void updateStore(TiStore oldStore, TiStore newStore) {
358     if (cache.updateStore(oldStore, newStore) && storeChecker != null) {
359       storeChecker.scheduleStoreHealthCheck(newStore);
360     }
361   }
362 
363   /** Clears all cache when some unexpected error occurs. */
364   public void clearRegionCache() {
365     cache.clearAll();
366   }
367 
368   /**
369    * Clears all cache when a TiKV server does not respond
370    *
371    * @param region region
372    */
373   public synchronized void onRequestFail(TiRegion region) {
374     cache.invalidateRegion(region);
375   }
376 
377   public void invalidateStore(long storeId) {
378     cache.invalidateStore(storeId);
379   }
380 
381   public void invalidateRegion(TiRegion region) {
382     cache.invalidateRegion(region);
383   }
384 
385   public void insertRegionToCache(TiRegion region) {
386     cache.insertRegionToCache(region);
387   }
388 
389   private BackOffer defaultBackOff() {
390     return ConcreteBackOffer.newCustomBackOff(
391         conf.getRawKVDefaultBackoffInMS(), pdClient.getClusterId());
392   }
393 }