View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common;
19  
20  import static org.tikv.common.util.ClientUtils.groupKeysByRegion;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  import com.google.protobuf.ByteString;
25  import java.util.ArrayList;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Optional;
30  import java.util.Properties;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Executors;
33  import java.util.stream.Collectors;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.tikv.common.apiversion.RequestKeyCodec;
37  import org.tikv.common.apiversion.RequestKeyV1RawCodec;
38  import org.tikv.common.apiversion.RequestKeyV1TxnCodec;
39  import org.tikv.common.apiversion.RequestKeyV2RawCodec;
40  import org.tikv.common.apiversion.RequestKeyV2TxnCodec;
41  import org.tikv.common.catalog.Catalog;
42  import org.tikv.common.exception.TiKVException;
43  import org.tikv.common.importer.ImporterStoreClient;
44  import org.tikv.common.importer.SwitchTiKVModeClient;
45  import org.tikv.common.key.Key;
46  import org.tikv.common.meta.TiTimestamp;
47  import org.tikv.common.region.RegionManager;
48  import org.tikv.common.region.RegionStoreClient;
49  import org.tikv.common.region.TiRegion;
50  import org.tikv.common.region.TiStore;
51  import org.tikv.common.util.BackOffFunction;
52  import org.tikv.common.util.BackOffer;
53  import org.tikv.common.util.ChannelFactory;
54  import org.tikv.common.util.ConcreteBackOffer;
55  import org.tikv.common.util.Pair;
56  import org.tikv.kvproto.Errorpb;
57  import org.tikv.kvproto.ImportSstpb;
58  import org.tikv.kvproto.Metapb;
59  import org.tikv.kvproto.Pdpb;
60  import org.tikv.raw.RawKVClient;
61  import org.tikv.raw.SmartRawKVClient;
62  import org.tikv.service.failsafe.CircuitBreaker;
63  import org.tikv.service.failsafe.CircuitBreakerImpl;
64  import org.tikv.txn.KVClient;
65  import org.tikv.txn.TxnKVClient;
66  
67  /**
68   * TiSession is the holder for PD Client, Store pdClient and PD Cache All sessions share common
69   * region store connection pool but separated PD conn and cache for better concurrency
70   *
71   * <p>TiSession is thread-safe but it's also recommended to have multiple session avoiding lock
72   * contention
73   */
74  public class TiSession implements AutoCloseable {
75    private static final Logger logger = LoggerFactory.getLogger(TiSession.class);
76    private static final Map<String, TiSession> sessionCachedMap = new HashMap<>();
77    private final TiConfiguration conf;
78    private final RequestKeyCodec keyCodec;
79    private final ChannelFactory channelFactory;
80    // below object creation is either heavy or making connection (pd), pending for lazy loading
81    private volatile PDClient client;
82    private volatile Catalog catalog;
83    private volatile ExecutorService indexScanThreadPool;
84    private volatile ExecutorService tableScanThreadPool;
85    private volatile ExecutorService batchGetThreadPool;
86    private volatile ExecutorService batchPutThreadPool;
87    private volatile ExecutorService batchDeleteThreadPool;
88    private volatile ExecutorService batchScanThreadPool;
89    private volatile ExecutorService deleteRangeThreadPool;
90    private volatile RegionManager regionManager;
91    private final boolean enableGrpcForward;
92    private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
93    private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder;
94    private volatile boolean isClosed = false;
95    private volatile SwitchTiKVModeClient switchTiKVModeClient;
96    private final MetricsServer metricsServer;
97    private final CircuitBreaker circuitBreaker;
98    private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6;
99  
100   static {
101     logger.info("Welcome to TiKV Java Client {}", getVersionInfo());
102   }
103 
104   private static class VersionInfo {
105 
106     private final String buildVersion;
107     private final String commitHash;
108 
109     public VersionInfo(String buildVersion, String commitHash) {
110       this.buildVersion = buildVersion;
111       this.commitHash = commitHash;
112     }
113 
114     @Override
115     public String toString() {
116       return buildVersion + "@" + commitHash;
117     }
118   }
119 
120   public TiSession(TiConfiguration conf) {
121     // may throw org.tikv.common.MetricsServer  - http server not up
122     // put it at the beginning of this function to avoid unclosed Thread
123     this.metricsServer = MetricsServer.getInstance(conf);
124 
125     this.conf = conf;
126 
127     if (conf.getApiVersion().isV1()) {
128       if (conf.isRawKVMode()) {
129         keyCodec = new RequestKeyV1RawCodec();
130       } else {
131         keyCodec = new RequestKeyV1TxnCodec();
132       }
133     } else {
134       if (conf.isRawKVMode()) {
135         keyCodec = new RequestKeyV2RawCodec();
136       } else {
137         keyCodec = new RequestKeyV2TxnCodec();
138       }
139     }
140 
141     if (conf.isTlsEnable()) {
142       if (conf.isJksEnable()) {
143         this.channelFactory =
144             new ChannelFactory(
145                 conf.getMaxFrameSize(),
146                 conf.getKeepaliveTime(),
147                 conf.getKeepaliveTimeout(),
148                 conf.getIdleTimeout(),
149                 conf.getConnRecycleTimeInSeconds(),
150                 conf.getCertReloadIntervalInSeconds(),
151                 conf.getJksKeyPath(),
152                 conf.getJksKeyPassword(),
153                 conf.getJksTrustPath(),
154                 conf.getJksTrustPassword());
155       } else {
156         this.channelFactory =
157             new ChannelFactory(
158                 conf.getMaxFrameSize(),
159                 conf.getKeepaliveTime(),
160                 conf.getKeepaliveTimeout(),
161                 conf.getIdleTimeout(),
162                 conf.getConnRecycleTimeInSeconds(),
163                 conf.getCertReloadIntervalInSeconds(),
164                 conf.getTrustCertCollectionFile(),
165                 conf.getKeyCertChainFile(),
166                 conf.getKeyFile());
167       }
168     } else {
169       this.channelFactory =
170           new ChannelFactory(
171               conf.getMaxFrameSize(),
172               conf.getKeepaliveTime(),
173               conf.getKeepaliveTimeout(),
174               conf.getIdleTimeout());
175     }
176 
177     this.client = PDClient.createRaw(conf, keyCodec, channelFactory);
178     if (conf.getApiVersion().isV2() && !StoreVersion.minTiKVVersion(Version.API_V2, client)) {
179       throw new IllegalStateException(
180           "With API v2, store versions should not older than " + Version.API_V2);
181     }
182 
183     this.enableGrpcForward = conf.getEnableGrpcForward();
184     if (this.enableGrpcForward) {
185       logger.info("enable grpc forward for high available");
186     }
187     if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
188       warmUp();
189     }
190     this.circuitBreaker = new CircuitBreakerImpl(conf, client.getClusterId());
191     logger.info(
192         "TiSession initialized in "
193             + conf.getKvMode()
194             + " mode in API version: "
195             + conf.getApiVersion());
196   }
197 
198   private static VersionInfo getVersionInfo() {
199     VersionInfo info;
200     try {
201       final Properties properties = new Properties();
202       properties.load(TiSession.class.getClassLoader().getResourceAsStream("git.properties"));
203       String version = properties.getProperty("git.build.version");
204       String commitHash = properties.getProperty("git.commit.id.full");
205       info = new VersionInfo(version, commitHash);
206     } catch (Exception e) {
207       logger.info("Fail to read package info: " + e.getMessage());
208       info = new VersionInfo("unknown", "unknown");
209     }
210     return info;
211   }
212 
213   @VisibleForTesting
214   public synchronized void warmUp() {
215     long warmUpStartTime = System.nanoTime();
216     BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId());
217     try {
218       // let JVM ClassLoader load gRPC error related classes
219       // this operation may cost 100ms
220       Errorpb.Error.newBuilder().setNotLeader(Errorpb.NotLeader.newBuilder().build()).build();
221 
222       this.client = getPDClient();
223       this.regionManager = getRegionManager();
224       List<Metapb.Store> stores = this.client.getAllStores(backOffer);
225       // warm up store cache
226       for (Metapb.Store store : stores) {
227         this.regionManager.updateStore(
228             null, new TiStore(this.client.getStore(backOffer, store.getId())));
229       }
230 
231       // use scan region to load region cache with limit
232       ByteString startKey = ByteString.EMPTY;
233       do {
234         List<Pdpb.Region> regions =
235             regionManager.scanRegions(
236                 backOffer, startKey, ByteString.EMPTY, conf.getScanRegionsLimit());
237         if (regions == null || regions.isEmpty()) {
238           // something went wrong, but the warm-up process could continue
239           break;
240         }
241         for (Pdpb.Region region : regions) {
242           regionManager.insertRegionToCache(
243               regionManager.createRegion(region.getRegion(), backOffer));
244         }
245         startKey = regions.get(regions.size() - 1).getRegion().getEndKey();
246       } while (!startKey.isEmpty());
247 
248       try (RawKVClient rawKVClient = createRawClient()) {
249         ByteString exampleKey = ByteString.EMPTY;
250         Optional<ByteString> prev = rawKVClient.get(exampleKey);
251         if (prev.isPresent()) {
252           rawKVClient.delete(exampleKey);
253           rawKVClient.putIfAbsent(exampleKey, prev.get());
254           rawKVClient.put(exampleKey, prev.get());
255         } else {
256           rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY);
257           rawKVClient.put(exampleKey, ByteString.EMPTY);
258           rawKVClient.delete(exampleKey);
259         }
260       }
261     } catch (Exception e) {
262       // ignore error
263       logger.info("warm up fails, ignored ", e);
264     } finally {
265       logger.info(
266           String.format(
267               "warm up duration %d ms", (System.nanoTime() - warmUpStartTime) / 1_000_000));
268     }
269   }
270 
271   @VisibleForTesting
272   public static TiSession create(TiConfiguration conf) {
273     return new TiSession(conf);
274   }
275 
276   @Deprecated
277   public static TiSession getInstance(TiConfiguration conf) {
278     synchronized (sessionCachedMap) {
279       String key = conf.getPdAddrsString();
280       if (sessionCachedMap.containsKey(key)) {
281         return sessionCachedMap.get(key);
282       }
283 
284       TiSession newSession = new TiSession(conf);
285       sessionCachedMap.put(key, newSession);
286       return newSession;
287     }
288   }
289 
290   public RawKVClient createRawClient() {
291     checkIsClosed();
292 
293     return new RawKVClient(this, this.getRegionStoreClientBuilder());
294   }
295 
296   public SmartRawKVClient createSmartRawClient() {
297     RawKVClient rawKVClient = createRawClient();
298     return new SmartRawKVClient(rawKVClient, circuitBreaker);
299   }
300 
301   public KVClient createKVClient() {
302     checkIsClosed();
303 
304     return new KVClient(this.conf, this.getRegionStoreClientBuilder(), this);
305   }
306 
307   public TxnKVClient createTxnClient() {
308     checkIsClosed();
309 
310     return new TxnKVClient(conf, this.getRegionStoreClientBuilder(), this.getPDClient());
311   }
312 
313   public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder() {
314     checkIsClosed();
315 
316     if (this.clientBuilder != null) {
317       return this.clientBuilder;
318     }
319 
320     // lazily create the clientBuilder for the current TiSession
321     synchronized (this) {
322       if (this.clientBuilder == null) {
323         this.clientBuilder =
324             new RegionStoreClient.RegionStoreClientBuilder(
325                 this.conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
326       }
327     }
328     return this.clientBuilder;
329   }
330 
331   public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClientBuilder() {
332     checkIsClosed();
333 
334     ImporterStoreClient.ImporterStoreClientBuilder res = importerClientBuilder;
335     if (res == null) {
336       synchronized (this) {
337         if (importerClientBuilder == null) {
338           if (conf.isTxnKVMode()) {
339             importerClientBuilder =
340                 new ImporterStoreClient.ImporterStoreClientBuilder<
341                     ImportSstpb.WriteRequest, ImportSstpb.WriteRequest>(
342                     conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
343           } else {
344             importerClientBuilder =
345                 new ImporterStoreClient.ImporterStoreClientBuilder<
346                     ImportSstpb.RawWriteRequest, ImportSstpb.RawWriteResponse>(
347                     conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
348           }
349         }
350         res = importerClientBuilder;
351       }
352     }
353     return res;
354   }
355 
356   public TiConfiguration getConf() {
357     return conf;
358   }
359 
360   public TiTimestamp getTimestamp() {
361     checkIsClosed();
362 
363     return getPDClient()
364         .getTimestamp(ConcreteBackOffer.newTsoBackOff(getPDClient().getClusterId()));
365   }
366 
367   public Snapshot createSnapshot() {
368     checkIsClosed();
369 
370     return new Snapshot(getTimestamp(), this);
371   }
372 
373   public Snapshot createSnapshot(TiTimestamp ts) {
374     checkIsClosed();
375 
376     return new Snapshot(ts, this);
377   }
378 
379   public PDClient getPDClient() {
380     checkIsClosed();
381 
382     PDClient res = client;
383     if (res == null) {
384       synchronized (this) {
385         if (client == null) {
386           client = PDClient.createRaw(this.getConf(), keyCodec, channelFactory);
387         }
388         res = client;
389       }
390     }
391     return res;
392   }
393 
394   public Catalog getCatalog() {
395     checkIsClosed();
396 
397     Catalog res = catalog;
398     if (res == null) {
399       synchronized (this) {
400         if (catalog == null) {
401           catalog = new Catalog(this::createSnapshot, conf.ifShowRowId(), conf.getDBPrefix());
402         }
403         res = catalog;
404       }
405     }
406     return res;
407   }
408 
409   public RegionManager getRegionManager() {
410     checkIsClosed();
411 
412     RegionManager res = regionManager;
413     if (res == null) {
414       synchronized (this) {
415         if (regionManager == null) {
416           regionManager = new RegionManager(getConf(), getPDClient(), this.channelFactory);
417         }
418         res = regionManager;
419       }
420     }
421     return res;
422   }
423 
424   public ExecutorService getThreadPoolForIndexScan() {
425     checkIsClosed();
426 
427     ExecutorService res = indexScanThreadPool;
428     if (res == null) {
429       synchronized (this) {
430         if (indexScanThreadPool == null) {
431           indexScanThreadPool =
432               Executors.newFixedThreadPool(
433                   conf.getIndexScanConcurrency(),
434                   new ThreadFactoryBuilder()
435                       .setNameFormat("index-scan-pool-%d")
436                       .setDaemon(true)
437                       .build());
438         }
439         res = indexScanThreadPool;
440       }
441     }
442     return res;
443   }
444 
445   public ExecutorService getThreadPoolForTableScan() {
446     checkIsClosed();
447 
448     ExecutorService res = tableScanThreadPool;
449     if (res == null) {
450       synchronized (this) {
451         if (tableScanThreadPool == null) {
452           tableScanThreadPool =
453               Executors.newFixedThreadPool(
454                   conf.getTableScanConcurrency(),
455                   new ThreadFactoryBuilder().setDaemon(true).build());
456         }
457         res = tableScanThreadPool;
458       }
459     }
460     return res;
461   }
462 
463   public ExecutorService getThreadPoolForBatchPut() {
464     checkIsClosed();
465 
466     ExecutorService res = batchPutThreadPool;
467     if (res == null) {
468       synchronized (this) {
469         if (batchPutThreadPool == null) {
470           batchPutThreadPool =
471               Executors.newFixedThreadPool(
472                   conf.getBatchPutConcurrency(),
473                   new ThreadFactoryBuilder()
474                       .setNameFormat("batchPut-thread-%d")
475                       .setDaemon(true)
476                       .build());
477         }
478         res = batchPutThreadPool;
479       }
480     }
481     return res;
482   }
483 
484   public ExecutorService getThreadPoolForBatchGet() {
485     checkIsClosed();
486 
487     ExecutorService res = batchGetThreadPool;
488     if (res == null) {
489       synchronized (this) {
490         if (batchGetThreadPool == null) {
491           batchGetThreadPool =
492               Executors.newFixedThreadPool(
493                   conf.getBatchGetConcurrency(),
494                   new ThreadFactoryBuilder()
495                       .setNameFormat("batchGet-thread-%d")
496                       .setDaemon(true)
497                       .build());
498         }
499         res = batchGetThreadPool;
500       }
501     }
502     return res;
503   }
504 
505   public ExecutorService getThreadPoolForBatchDelete() {
506     checkIsClosed();
507 
508     ExecutorService res = batchDeleteThreadPool;
509     if (res == null) {
510       synchronized (this) {
511         if (batchDeleteThreadPool == null) {
512           batchDeleteThreadPool =
513               Executors.newFixedThreadPool(
514                   conf.getBatchDeleteConcurrency(),
515                   new ThreadFactoryBuilder()
516                       .setNameFormat("batchDelete-thread-%d")
517                       .setDaemon(true)
518                       .build());
519         }
520         res = batchDeleteThreadPool;
521       }
522     }
523     return res;
524   }
525 
526   public ExecutorService getThreadPoolForBatchScan() {
527     checkIsClosed();
528 
529     ExecutorService res = batchScanThreadPool;
530     if (res == null) {
531       synchronized (this) {
532         if (batchScanThreadPool == null) {
533           batchScanThreadPool =
534               Executors.newFixedThreadPool(
535                   conf.getBatchScanConcurrency(),
536                   new ThreadFactoryBuilder()
537                       .setNameFormat("batchScan-thread-%d")
538                       .setDaemon(true)
539                       .build());
540         }
541         res = batchScanThreadPool;
542       }
543     }
544     return res;
545   }
546 
547   public ExecutorService getThreadPoolForDeleteRange() {
548     checkIsClosed();
549 
550     ExecutorService res = deleteRangeThreadPool;
551     if (res == null) {
552       synchronized (this) {
553         if (deleteRangeThreadPool == null) {
554           deleteRangeThreadPool =
555               Executors.newFixedThreadPool(
556                   conf.getDeleteRangeConcurrency(),
557                   new ThreadFactoryBuilder()
558                       .setNameFormat("deleteRange-thread-%d")
559                       .setDaemon(true)
560                       .build());
561         }
562         res = deleteRangeThreadPool;
563       }
564     }
565     return res;
566   }
567 
568   @VisibleForTesting
569   public ChannelFactory getChannelFactory() {
570     checkIsClosed();
571 
572     return channelFactory;
573   }
574 
575   /**
576    * SwitchTiKVModeClient is used for SST Ingest.
577    *
578    * @return a SwitchTiKVModeClient
579    */
580   public SwitchTiKVModeClient getSwitchTiKVModeClient() {
581     checkIsClosed();
582 
583     SwitchTiKVModeClient res = switchTiKVModeClient;
584     if (res == null) {
585       synchronized (this) {
586         if (switchTiKVModeClient == null) {
587           switchTiKVModeClient =
588               new SwitchTiKVModeClient(getPDClient(), getImporterRegionStoreClientBuilder());
589         }
590         res = switchTiKVModeClient;
591       }
592     }
593     return res;
594   }
595 
596   /**
597    * split region and scatter
598    *
599    * @param splitKeys
600    * @param splitRegionBackoffMS
601    * @param scatterRegionBackoffMS
602    * @param scatterWaitMS
603    */
604   public void splitRegionAndScatter(
605       List<byte[]> splitKeys,
606       int splitRegionBackoffMS,
607       int scatterRegionBackoffMS,
608       int scatterWaitMS) {
609     checkIsClosed();
610 
611     logger.info(String.format("split key's size is %d", splitKeys.size()));
612     long startMS = System.currentTimeMillis();
613 
614     // split region
615     List<Metapb.Region> newRegions =
616         splitRegion(
617             splitKeys
618                 .stream()
619                 .map(k -> Key.toRawKey(k).toByteString())
620                 .collect(Collectors.toList()),
621             ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS, getPDClient().getClusterId()));
622 
623     // scatter region
624     for (Metapb.Region newRegion : newRegions) {
625       try {
626         getPDClient()
627             .scatterRegion(
628                 newRegion,
629                 ConcreteBackOffer.newCustomBackOff(
630                     scatterRegionBackoffMS, getPDClient().getClusterId()));
631       } catch (Exception e) {
632         logger.warn(String.format("failed to scatter region: %d", newRegion.getId()), e);
633       }
634     }
635 
636     // wait scatter region finish
637     if (scatterWaitMS > 0) {
638       logger.info("start to wait scatter region finish");
639       long scatterRegionStartMS = System.currentTimeMillis();
640       for (Metapb.Region newRegion : newRegions) {
641         long remainMS = (scatterRegionStartMS + scatterWaitMS) - System.currentTimeMillis();
642         if (remainMS <= 0) {
643           logger.warn("wait scatter region timeout");
644           return;
645         }
646         getPDClient()
647             .waitScatterRegionFinish(
648                 newRegion,
649                 ConcreteBackOffer.newCustomBackOff((int) remainMS, getPDClient().getClusterId()));
650       }
651     } else {
652       logger.info("skip to wait scatter region finish");
653     }
654 
655     long endMS = System.currentTimeMillis();
656     logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000);
657   }
658 
659   /**
660    * split region and scatter
661    *
662    * @param splitKeys
663    */
664   public void splitRegionAndScatter(List<byte[]> splitKeys) {
665     checkIsClosed();
666 
667     int splitRegionBackoffMS = BackOffer.SPLIT_REGION_BACKOFF;
668     int scatterRegionBackoffMS = BackOffer.SCATTER_REGION_BACKOFF;
669     int scatterWaitMS = conf.getScatterWaitSeconds() * 1000;
670     splitRegionAndScatter(splitKeys, splitRegionBackoffMS, scatterRegionBackoffMS, scatterWaitMS);
671   }
672 
673   private List<Metapb.Region> splitRegion(List<ByteString> splitKeys, BackOffer backOffer) {
674     return splitRegion(splitKeys, backOffer, 1);
675   }
676 
677   private List<Metapb.Region> splitRegion(
678       List<ByteString> splitKeys, BackOffer backOffer, int depth) {
679     List<Metapb.Region> regions = new ArrayList<>();
680 
681     Map<TiRegion, List<ByteString>> groupKeys =
682         groupKeysByRegion(getRegionManager(), splitKeys, backOffer);
683     for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
684 
685       Pair<TiRegion, TiStore> pair =
686           getRegionManager().getRegionStorePairByKey(entry.getKey().getStartKey());
687       TiRegion region = pair.first;
688       TiStore store = pair.second;
689       List<ByteString> splits =
690           entry
691               .getValue()
692               .stream()
693               .filter(k -> !k.equals(region.getStartKey()) && !k.equals(region.getEndKey()))
694               .collect(Collectors.toList());
695 
696       if (splits.isEmpty()) {
697         logger.warn(
698             "split key equal to region start key or end key. Region splitting is not needed.");
699       } else {
700         logger.info("start to split region id={}, split size={}", region.getId(), splits.size());
701         List<Metapb.Region> newRegions;
702         try {
703           newRegions = getRegionStoreClientBuilder().build(region, store).splitRegion(splits);
704           // invalidate old region
705           getRegionManager().invalidateRegion(region);
706         } catch (final TiKVException e) {
707           // retry
708           logger.warn("ReSplitting ranges for splitRegion", e);
709           getRegionManager().invalidateRegion(region);
710           backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
711           if (depth >= MAX_SPLIT_REGION_STACK_DEPTH) {
712             logger.warn(
713                 String.format(
714                     "Skip split region because MAX_SPLIT_REGION_STACK_DEPTH(%d) reached!",
715                     MAX_SPLIT_REGION_STACK_DEPTH));
716             newRegions = new ArrayList<>();
717           } else {
718             newRegions = splitRegion(splits, backOffer, depth + 1);
719           }
720         }
721         logger.info("region id={}, new region size={}", region.getId(), newRegions.size());
722         regions.addAll(newRegions);
723       }
724     }
725 
726     logger.info("splitRegion: return region size={}", regions.size());
727     return regions;
728   }
729 
730   private void checkIsClosed() {
731     if (isClosed) {
732       throw new RuntimeException("this TiSession is closed!");
733     }
734   }
735 
736   public synchronized void closeAwaitTermination(long timeoutMS) throws Exception {
737     shutdown(false);
738 
739     long startMS = System.currentTimeMillis();
740     while (true) {
741       if (isTerminatedExecutorServices()) {
742         cleanAfterTerminated();
743         return;
744       }
745 
746       if (System.currentTimeMillis() - startMS > timeoutMS) {
747         shutdown(true);
748         return;
749       }
750       Thread.sleep(500);
751     }
752   }
753 
754   @Override
755   public synchronized void close() throws Exception {
756     shutdown(true);
757   }
758 
759   private synchronized void shutdown(boolean now) throws Exception {
760     if (!isClosed) {
761       isClosed = true;
762       synchronized (sessionCachedMap) {
763         sessionCachedMap.remove(conf.getPdAddrsString());
764       }
765 
766       if (metricsServer != null) {
767         metricsServer.close();
768       }
769 
770       if (circuitBreaker != null) {
771         circuitBreaker.close();
772       }
773     }
774 
775     if (now) {
776       shutdownNowExecutorServices();
777       cleanAfterTerminated();
778     } else {
779       shutdownExecutorServices();
780     }
781   }
782 
783   private synchronized void cleanAfterTerminated() throws InterruptedException {
784     if (regionManager != null) {
785       regionManager.close();
786     }
787     if (client != null) {
788       client.close();
789     }
790     if (catalog != null) {
791       catalog.close();
792     }
793 
794     if (switchTiKVModeClient != null) {
795       switchTiKVModeClient.stopKeepTiKVToImportMode();
796     }
797   }
798 
799   private List<ExecutorService> getExecutorServices() {
800     List<ExecutorService> executorServiceList = new ArrayList<>();
801     if (tableScanThreadPool != null) {
802       executorServiceList.add(tableScanThreadPool);
803     }
804     if (indexScanThreadPool != null) {
805       executorServiceList.add(indexScanThreadPool);
806     }
807     if (batchGetThreadPool != null) {
808       executorServiceList.add(batchGetThreadPool);
809     }
810     if (batchPutThreadPool != null) {
811       executorServiceList.add(batchPutThreadPool);
812     }
813     if (batchDeleteThreadPool != null) {
814       executorServiceList.add(batchDeleteThreadPool);
815     }
816     if (batchScanThreadPool != null) {
817       executorServiceList.add(batchScanThreadPool);
818     }
819     if (deleteRangeThreadPool != null) {
820       executorServiceList.add(deleteRangeThreadPool);
821     }
822     return executorServiceList;
823   }
824 
825   private void shutdownExecutorServices() {
826     for (ExecutorService executorService : getExecutorServices()) {
827       executorService.shutdown();
828     }
829   }
830 
831   private void shutdownNowExecutorServices() {
832     for (ExecutorService executorService : getExecutorServices()) {
833       executorService.shutdownNow();
834     }
835   }
836 
837   private boolean isTerminatedExecutorServices() {
838     for (ExecutorService executorService : getExecutorServices()) {
839       if (!executorService.isTerminated()) {
840         return false;
841       }
842     }
843     return true;
844   }
845 }