1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common;
19
20 import static org.tikv.common.util.ClientUtils.groupKeysByRegion;
21
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import com.google.protobuf.ByteString;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Optional;
30 import java.util.Properties;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.stream.Collectors;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.tikv.common.apiversion.RequestKeyCodec;
37 import org.tikv.common.apiversion.RequestKeyV1RawCodec;
38 import org.tikv.common.apiversion.RequestKeyV1TxnCodec;
39 import org.tikv.common.apiversion.RequestKeyV2RawCodec;
40 import org.tikv.common.apiversion.RequestKeyV2TxnCodec;
41 import org.tikv.common.catalog.Catalog;
42 import org.tikv.common.exception.TiKVException;
43 import org.tikv.common.importer.ImporterStoreClient;
44 import org.tikv.common.importer.SwitchTiKVModeClient;
45 import org.tikv.common.key.Key;
46 import org.tikv.common.meta.TiTimestamp;
47 import org.tikv.common.region.RegionManager;
48 import org.tikv.common.region.RegionStoreClient;
49 import org.tikv.common.region.TiRegion;
50 import org.tikv.common.region.TiStore;
51 import org.tikv.common.util.BackOffFunction;
52 import org.tikv.common.util.BackOffer;
53 import org.tikv.common.util.ChannelFactory;
54 import org.tikv.common.util.ConcreteBackOffer;
55 import org.tikv.common.util.Pair;
56 import org.tikv.kvproto.Errorpb;
57 import org.tikv.kvproto.ImportSstpb;
58 import org.tikv.kvproto.Metapb;
59 import org.tikv.kvproto.Pdpb;
60 import org.tikv.raw.RawKVClient;
61 import org.tikv.raw.SmartRawKVClient;
62 import org.tikv.service.failsafe.CircuitBreaker;
63 import org.tikv.service.failsafe.CircuitBreakerImpl;
64 import org.tikv.txn.KVClient;
65 import org.tikv.txn.TxnKVClient;
66
67
68
69
70
71
72
73
74 public class TiSession implements AutoCloseable {
75 private static final Logger logger = LoggerFactory.getLogger(TiSession.class);
76 private static final Map<String, TiSession> sessionCachedMap = new HashMap<>();
77 private final TiConfiguration conf;
78 private final RequestKeyCodec keyCodec;
79 private final ChannelFactory channelFactory;
80
81 private volatile PDClient client;
82 private volatile Catalog catalog;
83 private volatile ExecutorService indexScanThreadPool;
84 private volatile ExecutorService tableScanThreadPool;
85 private volatile ExecutorService batchGetThreadPool;
86 private volatile ExecutorService batchPutThreadPool;
87 private volatile ExecutorService batchDeleteThreadPool;
88 private volatile ExecutorService batchScanThreadPool;
89 private volatile ExecutorService deleteRangeThreadPool;
90 private volatile RegionManager regionManager;
91 private final boolean enableGrpcForward;
92 private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
93 private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder;
94 private volatile boolean isClosed = false;
95 private volatile SwitchTiKVModeClient switchTiKVModeClient;
96 private final MetricsServer metricsServer;
97 private final CircuitBreaker circuitBreaker;
98 private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6;
99
100 static {
101 logger.info("Welcome to TiKV Java Client {}", getVersionInfo());
102 }
103
104 private static class VersionInfo {
105
106 private final String buildVersion;
107 private final String commitHash;
108
109 public VersionInfo(String buildVersion, String commitHash) {
110 this.buildVersion = buildVersion;
111 this.commitHash = commitHash;
112 }
113
114 @Override
115 public String toString() {
116 return buildVersion + "@" + commitHash;
117 }
118 }
119
120 public TiSession(TiConfiguration conf) {
121
122
123 this.metricsServer = MetricsServer.getInstance(conf);
124
125 this.conf = conf;
126
127 if (conf.getApiVersion().isV1()) {
128 if (conf.isRawKVMode()) {
129 keyCodec = new RequestKeyV1RawCodec();
130 } else {
131 keyCodec = new RequestKeyV1TxnCodec();
132 }
133 } else {
134 if (conf.isRawKVMode()) {
135 keyCodec = new RequestKeyV2RawCodec();
136 } else {
137 keyCodec = new RequestKeyV2TxnCodec();
138 }
139 }
140
141 if (conf.isTlsEnable()) {
142 if (conf.isJksEnable()) {
143 this.channelFactory =
144 new ChannelFactory(
145 conf.getMaxFrameSize(),
146 conf.getKeepaliveTime(),
147 conf.getKeepaliveTimeout(),
148 conf.getIdleTimeout(),
149 conf.getConnRecycleTimeInSeconds(),
150 conf.getCertReloadIntervalInSeconds(),
151 conf.getJksKeyPath(),
152 conf.getJksKeyPassword(),
153 conf.getJksTrustPath(),
154 conf.getJksTrustPassword());
155 } else {
156 this.channelFactory =
157 new ChannelFactory(
158 conf.getMaxFrameSize(),
159 conf.getKeepaliveTime(),
160 conf.getKeepaliveTimeout(),
161 conf.getIdleTimeout(),
162 conf.getConnRecycleTimeInSeconds(),
163 conf.getCertReloadIntervalInSeconds(),
164 conf.getTrustCertCollectionFile(),
165 conf.getKeyCertChainFile(),
166 conf.getKeyFile());
167 }
168 } else {
169 this.channelFactory =
170 new ChannelFactory(
171 conf.getMaxFrameSize(),
172 conf.getKeepaliveTime(),
173 conf.getKeepaliveTimeout(),
174 conf.getIdleTimeout());
175 }
176
177 this.client = PDClient.createRaw(conf, keyCodec, channelFactory);
178 if (conf.getApiVersion().isV2() && !StoreVersion.minTiKVVersion(Version.API_V2, client)) {
179 throw new IllegalStateException(
180 "With API v2, store versions should not older than " + Version.API_V2);
181 }
182
183 this.enableGrpcForward = conf.getEnableGrpcForward();
184 if (this.enableGrpcForward) {
185 logger.info("enable grpc forward for high available");
186 }
187 if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
188 warmUp();
189 }
190 this.circuitBreaker = new CircuitBreakerImpl(conf, client.getClusterId());
191 logger.info(
192 "TiSession initialized in "
193 + conf.getKvMode()
194 + " mode in API version: "
195 + conf.getApiVersion());
196 }
197
198 private static VersionInfo getVersionInfo() {
199 VersionInfo info;
200 try {
201 final Properties properties = new Properties();
202 properties.load(TiSession.class.getClassLoader().getResourceAsStream("git.properties"));
203 String version = properties.getProperty("git.build.version");
204 String commitHash = properties.getProperty("git.commit.id.full");
205 info = new VersionInfo(version, commitHash);
206 } catch (Exception e) {
207 logger.info("Fail to read package info: " + e.getMessage());
208 info = new VersionInfo("unknown", "unknown");
209 }
210 return info;
211 }
212
213 @VisibleForTesting
214 public synchronized void warmUp() {
215 long warmUpStartTime = System.nanoTime();
216 BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId());
217 try {
218
219
220 Errorpb.Error.newBuilder().setNotLeader(Errorpb.NotLeader.newBuilder().build()).build();
221
222 this.client = getPDClient();
223 this.regionManager = getRegionManager();
224 List<Metapb.Store> stores = this.client.getAllStores(backOffer);
225
226 for (Metapb.Store store : stores) {
227 this.regionManager.updateStore(
228 null, new TiStore(this.client.getStore(backOffer, store.getId())));
229 }
230
231
232 ByteString startKey = ByteString.EMPTY;
233 do {
234 List<Pdpb.Region> regions =
235 regionManager.scanRegions(
236 backOffer, startKey, ByteString.EMPTY, conf.getScanRegionsLimit());
237 if (regions == null || regions.isEmpty()) {
238
239 break;
240 }
241 for (Pdpb.Region region : regions) {
242 regionManager.insertRegionToCache(
243 regionManager.createRegion(region.getRegion(), backOffer));
244 }
245 startKey = regions.get(regions.size() - 1).getRegion().getEndKey();
246 } while (!startKey.isEmpty());
247
248 try (RawKVClient rawKVClient = createRawClient()) {
249 ByteString exampleKey = ByteString.EMPTY;
250 Optional<ByteString> prev = rawKVClient.get(exampleKey);
251 if (prev.isPresent()) {
252 rawKVClient.delete(exampleKey);
253 rawKVClient.putIfAbsent(exampleKey, prev.get());
254 rawKVClient.put(exampleKey, prev.get());
255 } else {
256 rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY);
257 rawKVClient.put(exampleKey, ByteString.EMPTY);
258 rawKVClient.delete(exampleKey);
259 }
260 }
261 } catch (Exception e) {
262
263 logger.info("warm up fails, ignored ", e);
264 } finally {
265 logger.info(
266 String.format(
267 "warm up duration %d ms", (System.nanoTime() - warmUpStartTime) / 1_000_000));
268 }
269 }
270
271 @VisibleForTesting
272 public static TiSession create(TiConfiguration conf) {
273 return new TiSession(conf);
274 }
275
276 @Deprecated
277 public static TiSession getInstance(TiConfiguration conf) {
278 synchronized (sessionCachedMap) {
279 String key = conf.getPdAddrsString();
280 if (sessionCachedMap.containsKey(key)) {
281 return sessionCachedMap.get(key);
282 }
283
284 TiSession newSession = new TiSession(conf);
285 sessionCachedMap.put(key, newSession);
286 return newSession;
287 }
288 }
289
290 public RawKVClient createRawClient() {
291 checkIsClosed();
292
293 return new RawKVClient(this, this.getRegionStoreClientBuilder());
294 }
295
296 public SmartRawKVClient createSmartRawClient() {
297 RawKVClient rawKVClient = createRawClient();
298 return new SmartRawKVClient(rawKVClient, circuitBreaker);
299 }
300
301 public KVClient createKVClient() {
302 checkIsClosed();
303
304 return new KVClient(this.conf, this.getRegionStoreClientBuilder(), this);
305 }
306
307 public TxnKVClient createTxnClient() {
308 checkIsClosed();
309
310 return new TxnKVClient(conf, this.getRegionStoreClientBuilder(), this.getPDClient());
311 }
312
313 public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder() {
314 checkIsClosed();
315
316 if (this.clientBuilder != null) {
317 return this.clientBuilder;
318 }
319
320
321 synchronized (this) {
322 if (this.clientBuilder == null) {
323 this.clientBuilder =
324 new RegionStoreClient.RegionStoreClientBuilder(
325 this.conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
326 }
327 }
328 return this.clientBuilder;
329 }
330
331 public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClientBuilder() {
332 checkIsClosed();
333
334 ImporterStoreClient.ImporterStoreClientBuilder res = importerClientBuilder;
335 if (res == null) {
336 synchronized (this) {
337 if (importerClientBuilder == null) {
338 if (conf.isTxnKVMode()) {
339 importerClientBuilder =
340 new ImporterStoreClient.ImporterStoreClientBuilder<
341 ImportSstpb.WriteRequest, ImportSstpb.WriteRequest>(
342 conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
343 } else {
344 importerClientBuilder =
345 new ImporterStoreClient.ImporterStoreClientBuilder<
346 ImportSstpb.RawWriteRequest, ImportSstpb.RawWriteResponse>(
347 conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
348 }
349 }
350 res = importerClientBuilder;
351 }
352 }
353 return res;
354 }
355
356 public TiConfiguration getConf() {
357 return conf;
358 }
359
360 public TiTimestamp getTimestamp() {
361 checkIsClosed();
362
363 return getPDClient()
364 .getTimestamp(ConcreteBackOffer.newTsoBackOff(getPDClient().getClusterId()));
365 }
366
367 public Snapshot createSnapshot() {
368 checkIsClosed();
369
370 return new Snapshot(getTimestamp(), this);
371 }
372
373 public Snapshot createSnapshot(TiTimestamp ts) {
374 checkIsClosed();
375
376 return new Snapshot(ts, this);
377 }
378
379 public PDClient getPDClient() {
380 checkIsClosed();
381
382 PDClient res = client;
383 if (res == null) {
384 synchronized (this) {
385 if (client == null) {
386 client = PDClient.createRaw(this.getConf(), keyCodec, channelFactory);
387 }
388 res = client;
389 }
390 }
391 return res;
392 }
393
394 public Catalog getCatalog() {
395 checkIsClosed();
396
397 Catalog res = catalog;
398 if (res == null) {
399 synchronized (this) {
400 if (catalog == null) {
401 catalog = new Catalog(this::createSnapshot, conf.ifShowRowId(), conf.getDBPrefix());
402 }
403 res = catalog;
404 }
405 }
406 return res;
407 }
408
409 public RegionManager getRegionManager() {
410 checkIsClosed();
411
412 RegionManager res = regionManager;
413 if (res == null) {
414 synchronized (this) {
415 if (regionManager == null) {
416 regionManager = new RegionManager(getConf(), getPDClient(), this.channelFactory);
417 }
418 res = regionManager;
419 }
420 }
421 return res;
422 }
423
424 public ExecutorService getThreadPoolForIndexScan() {
425 checkIsClosed();
426
427 ExecutorService res = indexScanThreadPool;
428 if (res == null) {
429 synchronized (this) {
430 if (indexScanThreadPool == null) {
431 indexScanThreadPool =
432 Executors.newFixedThreadPool(
433 conf.getIndexScanConcurrency(),
434 new ThreadFactoryBuilder()
435 .setNameFormat("index-scan-pool-%d")
436 .setDaemon(true)
437 .build());
438 }
439 res = indexScanThreadPool;
440 }
441 }
442 return res;
443 }
444
445 public ExecutorService getThreadPoolForTableScan() {
446 checkIsClosed();
447
448 ExecutorService res = tableScanThreadPool;
449 if (res == null) {
450 synchronized (this) {
451 if (tableScanThreadPool == null) {
452 tableScanThreadPool =
453 Executors.newFixedThreadPool(
454 conf.getTableScanConcurrency(),
455 new ThreadFactoryBuilder().setDaemon(true).build());
456 }
457 res = tableScanThreadPool;
458 }
459 }
460 return res;
461 }
462
463 public ExecutorService getThreadPoolForBatchPut() {
464 checkIsClosed();
465
466 ExecutorService res = batchPutThreadPool;
467 if (res == null) {
468 synchronized (this) {
469 if (batchPutThreadPool == null) {
470 batchPutThreadPool =
471 Executors.newFixedThreadPool(
472 conf.getBatchPutConcurrency(),
473 new ThreadFactoryBuilder()
474 .setNameFormat("batchPut-thread-%d")
475 .setDaemon(true)
476 .build());
477 }
478 res = batchPutThreadPool;
479 }
480 }
481 return res;
482 }
483
484 public ExecutorService getThreadPoolForBatchGet() {
485 checkIsClosed();
486
487 ExecutorService res = batchGetThreadPool;
488 if (res == null) {
489 synchronized (this) {
490 if (batchGetThreadPool == null) {
491 batchGetThreadPool =
492 Executors.newFixedThreadPool(
493 conf.getBatchGetConcurrency(),
494 new ThreadFactoryBuilder()
495 .setNameFormat("batchGet-thread-%d")
496 .setDaemon(true)
497 .build());
498 }
499 res = batchGetThreadPool;
500 }
501 }
502 return res;
503 }
504
505 public ExecutorService getThreadPoolForBatchDelete() {
506 checkIsClosed();
507
508 ExecutorService res = batchDeleteThreadPool;
509 if (res == null) {
510 synchronized (this) {
511 if (batchDeleteThreadPool == null) {
512 batchDeleteThreadPool =
513 Executors.newFixedThreadPool(
514 conf.getBatchDeleteConcurrency(),
515 new ThreadFactoryBuilder()
516 .setNameFormat("batchDelete-thread-%d")
517 .setDaemon(true)
518 .build());
519 }
520 res = batchDeleteThreadPool;
521 }
522 }
523 return res;
524 }
525
526 public ExecutorService getThreadPoolForBatchScan() {
527 checkIsClosed();
528
529 ExecutorService res = batchScanThreadPool;
530 if (res == null) {
531 synchronized (this) {
532 if (batchScanThreadPool == null) {
533 batchScanThreadPool =
534 Executors.newFixedThreadPool(
535 conf.getBatchScanConcurrency(),
536 new ThreadFactoryBuilder()
537 .setNameFormat("batchScan-thread-%d")
538 .setDaemon(true)
539 .build());
540 }
541 res = batchScanThreadPool;
542 }
543 }
544 return res;
545 }
546
547 public ExecutorService getThreadPoolForDeleteRange() {
548 checkIsClosed();
549
550 ExecutorService res = deleteRangeThreadPool;
551 if (res == null) {
552 synchronized (this) {
553 if (deleteRangeThreadPool == null) {
554 deleteRangeThreadPool =
555 Executors.newFixedThreadPool(
556 conf.getDeleteRangeConcurrency(),
557 new ThreadFactoryBuilder()
558 .setNameFormat("deleteRange-thread-%d")
559 .setDaemon(true)
560 .build());
561 }
562 res = deleteRangeThreadPool;
563 }
564 }
565 return res;
566 }
567
568 @VisibleForTesting
569 public ChannelFactory getChannelFactory() {
570 checkIsClosed();
571
572 return channelFactory;
573 }
574
575
576
577
578
579
580 public SwitchTiKVModeClient getSwitchTiKVModeClient() {
581 checkIsClosed();
582
583 SwitchTiKVModeClient res = switchTiKVModeClient;
584 if (res == null) {
585 synchronized (this) {
586 if (switchTiKVModeClient == null) {
587 switchTiKVModeClient =
588 new SwitchTiKVModeClient(getPDClient(), getImporterRegionStoreClientBuilder());
589 }
590 res = switchTiKVModeClient;
591 }
592 }
593 return res;
594 }
595
596
597
598
599
600
601
602
603
604 public void splitRegionAndScatter(
605 List<byte[]> splitKeys,
606 int splitRegionBackoffMS,
607 int scatterRegionBackoffMS,
608 int scatterWaitMS) {
609 checkIsClosed();
610
611 logger.info(String.format("split key's size is %d", splitKeys.size()));
612 long startMS = System.currentTimeMillis();
613
614
615 List<Metapb.Region> newRegions =
616 splitRegion(
617 splitKeys
618 .stream()
619 .map(k -> Key.toRawKey(k).toByteString())
620 .collect(Collectors.toList()),
621 ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS, getPDClient().getClusterId()));
622
623
624 for (Metapb.Region newRegion : newRegions) {
625 try {
626 getPDClient()
627 .scatterRegion(
628 newRegion,
629 ConcreteBackOffer.newCustomBackOff(
630 scatterRegionBackoffMS, getPDClient().getClusterId()));
631 } catch (Exception e) {
632 logger.warn(String.format("failed to scatter region: %d", newRegion.getId()), e);
633 }
634 }
635
636
637 if (scatterWaitMS > 0) {
638 logger.info("start to wait scatter region finish");
639 long scatterRegionStartMS = System.currentTimeMillis();
640 for (Metapb.Region newRegion : newRegions) {
641 long remainMS = (scatterRegionStartMS + scatterWaitMS) - System.currentTimeMillis();
642 if (remainMS <= 0) {
643 logger.warn("wait scatter region timeout");
644 return;
645 }
646 getPDClient()
647 .waitScatterRegionFinish(
648 newRegion,
649 ConcreteBackOffer.newCustomBackOff((int) remainMS, getPDClient().getClusterId()));
650 }
651 } else {
652 logger.info("skip to wait scatter region finish");
653 }
654
655 long endMS = System.currentTimeMillis();
656 logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000);
657 }
658
659
660
661
662
663
664 public void splitRegionAndScatter(List<byte[]> splitKeys) {
665 checkIsClosed();
666
667 int splitRegionBackoffMS = BackOffer.SPLIT_REGION_BACKOFF;
668 int scatterRegionBackoffMS = BackOffer.SCATTER_REGION_BACKOFF;
669 int scatterWaitMS = conf.getScatterWaitSeconds() * 1000;
670 splitRegionAndScatter(splitKeys, splitRegionBackoffMS, scatterRegionBackoffMS, scatterWaitMS);
671 }
672
673 private List<Metapb.Region> splitRegion(List<ByteString> splitKeys, BackOffer backOffer) {
674 return splitRegion(splitKeys, backOffer, 1);
675 }
676
677 private List<Metapb.Region> splitRegion(
678 List<ByteString> splitKeys, BackOffer backOffer, int depth) {
679 List<Metapb.Region> regions = new ArrayList<>();
680
681 Map<TiRegion, List<ByteString>> groupKeys =
682 groupKeysByRegion(getRegionManager(), splitKeys, backOffer);
683 for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
684
685 Pair<TiRegion, TiStore> pair =
686 getRegionManager().getRegionStorePairByKey(entry.getKey().getStartKey());
687 TiRegion region = pair.first;
688 TiStore store = pair.second;
689 List<ByteString> splits =
690 entry
691 .getValue()
692 .stream()
693 .filter(k -> !k.equals(region.getStartKey()) && !k.equals(region.getEndKey()))
694 .collect(Collectors.toList());
695
696 if (splits.isEmpty()) {
697 logger.warn(
698 "split key equal to region start key or end key. Region splitting is not needed.");
699 } else {
700 logger.info("start to split region id={}, split size={}", region.getId(), splits.size());
701 List<Metapb.Region> newRegions;
702 try {
703 newRegions = getRegionStoreClientBuilder().build(region, store).splitRegion(splits);
704
705 getRegionManager().invalidateRegion(region);
706 } catch (final TiKVException e) {
707
708 logger.warn("ReSplitting ranges for splitRegion", e);
709 getRegionManager().invalidateRegion(region);
710 backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
711 if (depth >= MAX_SPLIT_REGION_STACK_DEPTH) {
712 logger.warn(
713 String.format(
714 "Skip split region because MAX_SPLIT_REGION_STACK_DEPTH(%d) reached!",
715 MAX_SPLIT_REGION_STACK_DEPTH));
716 newRegions = new ArrayList<>();
717 } else {
718 newRegions = splitRegion(splits, backOffer, depth + 1);
719 }
720 }
721 logger.info("region id={}, new region size={}", region.getId(), newRegions.size());
722 regions.addAll(newRegions);
723 }
724 }
725
726 logger.info("splitRegion: return region size={}", regions.size());
727 return regions;
728 }
729
730 private void checkIsClosed() {
731 if (isClosed) {
732 throw new RuntimeException("this TiSession is closed!");
733 }
734 }
735
736 public synchronized void closeAwaitTermination(long timeoutMS) throws Exception {
737 shutdown(false);
738
739 long startMS = System.currentTimeMillis();
740 while (true) {
741 if (isTerminatedExecutorServices()) {
742 cleanAfterTerminated();
743 return;
744 }
745
746 if (System.currentTimeMillis() - startMS > timeoutMS) {
747 shutdown(true);
748 return;
749 }
750 Thread.sleep(500);
751 }
752 }
753
754 @Override
755 public synchronized void close() throws Exception {
756 shutdown(true);
757 }
758
759 private synchronized void shutdown(boolean now) throws Exception {
760 if (!isClosed) {
761 isClosed = true;
762 synchronized (sessionCachedMap) {
763 sessionCachedMap.remove(conf.getPdAddrsString());
764 }
765
766 if (metricsServer != null) {
767 metricsServer.close();
768 }
769
770 if (circuitBreaker != null) {
771 circuitBreaker.close();
772 }
773 }
774
775 if (now) {
776 shutdownNowExecutorServices();
777 cleanAfterTerminated();
778 } else {
779 shutdownExecutorServices();
780 }
781 }
782
783 private synchronized void cleanAfterTerminated() throws InterruptedException {
784 if (regionManager != null) {
785 regionManager.close();
786 }
787 if (client != null) {
788 client.close();
789 }
790 if (catalog != null) {
791 catalog.close();
792 }
793
794 if (switchTiKVModeClient != null) {
795 switchTiKVModeClient.stopKeepTiKVToImportMode();
796 }
797 }
798
799 private List<ExecutorService> getExecutorServices() {
800 List<ExecutorService> executorServiceList = new ArrayList<>();
801 if (tableScanThreadPool != null) {
802 executorServiceList.add(tableScanThreadPool);
803 }
804 if (indexScanThreadPool != null) {
805 executorServiceList.add(indexScanThreadPool);
806 }
807 if (batchGetThreadPool != null) {
808 executorServiceList.add(batchGetThreadPool);
809 }
810 if (batchPutThreadPool != null) {
811 executorServiceList.add(batchPutThreadPool);
812 }
813 if (batchDeleteThreadPool != null) {
814 executorServiceList.add(batchDeleteThreadPool);
815 }
816 if (batchScanThreadPool != null) {
817 executorServiceList.add(batchScanThreadPool);
818 }
819 if (deleteRangeThreadPool != null) {
820 executorServiceList.add(deleteRangeThreadPool);
821 }
822 return executorServiceList;
823 }
824
825 private void shutdownExecutorServices() {
826 for (ExecutorService executorService : getExecutorServices()) {
827 executorService.shutdown();
828 }
829 }
830
831 private void shutdownNowExecutorServices() {
832 for (ExecutorService executorService : getExecutorServices()) {
833 executorService.shutdownNow();
834 }
835 }
836
837 private boolean isTerminatedExecutorServices() {
838 for (ExecutorService executorService : getExecutorServices()) {
839 if (!executorService.isTerminated()) {
840 return false;
841 }
842 }
843 return true;
844 }
845 }