1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.importer;
19
20 import com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import java.util.List;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import org.tikv.common.PDClient;
26 import org.tikv.common.region.TiStore;
27 import org.tikv.common.util.BackOffer;
28 import org.tikv.common.util.ConcreteBackOffer;
29 import org.tikv.kvproto.ImportSstpb;
30 import org.tikv.kvproto.Metapb;
31
32 public class SwitchTiKVModeClient {
33 private static final int IMPORT_MODE_TIMEOUT = 600;
34 private static final int KEEP_TIKV_TO_IMPORT_MODE_PERIOD = IMPORT_MODE_TIMEOUT / 5;
35
36 private final PDClient pdClient;
37 private final ImporterStoreClient.ImporterStoreClientBuilder builder;
38
39 private ScheduledExecutorService ingestScheduledExecutorService;
40
41 public SwitchTiKVModeClient(
42 PDClient pdClient, ImporterStoreClient.ImporterStoreClientBuilder builder) {
43 this.pdClient = pdClient;
44 this.builder = builder;
45 }
46
47 public void switchTiKVToNormalMode() {
48 doSwitchTiKVMode(ImportSstpb.SwitchMode.Normal);
49 }
50
51 public synchronized void keepTiKVToImportMode() {
52 if (ingestScheduledExecutorService == null) {
53 ingestScheduledExecutorService =
54 Executors.newSingleThreadScheduledExecutor(
55 new ThreadFactoryBuilder()
56 .setNameFormat("switch-tikv-mode-pool-%d")
57 .setDaemon(true)
58 .build());
59 ingestScheduledExecutorService.scheduleAtFixedRate(
60 this::switchTiKVToImportMode, 0, KEEP_TIKV_TO_IMPORT_MODE_PERIOD, TimeUnit.SECONDS);
61 }
62 }
63
64 public synchronized void stopKeepTiKVToImportMode() {
65 if (ingestScheduledExecutorService != null) {
66 ingestScheduledExecutorService.shutdown();
67 ingestScheduledExecutorService = null;
68 }
69 }
70
71 private void switchTiKVToImportMode() {
72 doSwitchTiKVMode(ImportSstpb.SwitchMode.Import);
73 }
74
75 private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) {
76 BackOffer bo =
77 ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
78 List<Metapb.Store> allStores = pdClient.getAllStores(bo);
79 for (Metapb.Store store : allStores) {
80 ImporterStoreClient client = builder.build(new TiStore(store));
81 client.switchMode(mode);
82 }
83 }
84 }