View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.importer;
19  
20  import com.google.common.util.concurrent.ThreadFactoryBuilder;
21  import java.util.List;
22  import java.util.concurrent.Executors;
23  import java.util.concurrent.ScheduledExecutorService;
24  import java.util.concurrent.TimeUnit;
25  import org.tikv.common.PDClient;
26  import org.tikv.common.region.TiStore;
27  import org.tikv.common.util.BackOffer;
28  import org.tikv.common.util.ConcreteBackOffer;
29  import org.tikv.kvproto.ImportSstpb;
30  import org.tikv.kvproto.Metapb;
31  
32  public class SwitchTiKVModeClient {
33    private static final int IMPORT_MODE_TIMEOUT = 600;
34    private static final int KEEP_TIKV_TO_IMPORT_MODE_PERIOD = IMPORT_MODE_TIMEOUT / 5;
35  
36    private final PDClient pdClient;
37    private final ImporterStoreClient.ImporterStoreClientBuilder builder;
38  
39    private ScheduledExecutorService ingestScheduledExecutorService;
40  
41    public SwitchTiKVModeClient(
42        PDClient pdClient, ImporterStoreClient.ImporterStoreClientBuilder builder) {
43      this.pdClient = pdClient;
44      this.builder = builder;
45    }
46  
47    public void switchTiKVToNormalMode() {
48      doSwitchTiKVMode(ImportSstpb.SwitchMode.Normal);
49    }
50  
51    public synchronized void keepTiKVToImportMode() {
52      if (ingestScheduledExecutorService == null) {
53        ingestScheduledExecutorService =
54            Executors.newSingleThreadScheduledExecutor(
55                new ThreadFactoryBuilder()
56                    .setNameFormat("switch-tikv-mode-pool-%d")
57                    .setDaemon(true)
58                    .build());
59        ingestScheduledExecutorService.scheduleAtFixedRate(
60            this::switchTiKVToImportMode, 0, KEEP_TIKV_TO_IMPORT_MODE_PERIOD, TimeUnit.SECONDS);
61      }
62    }
63  
64    public synchronized void stopKeepTiKVToImportMode() {
65      if (ingestScheduledExecutorService != null) {
66        ingestScheduledExecutorService.shutdown();
67        ingestScheduledExecutorService = null;
68      }
69    }
70  
71    private void switchTiKVToImportMode() {
72      doSwitchTiKVMode(ImportSstpb.SwitchMode.Import);
73    }
74  
75    private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) {
76      BackOffer bo =
77          ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
78      List<Metapb.Store> allStores = pdClient.getAllStores(bo);
79      for (Metapb.Store store : allStores) {
80        ImporterStoreClient client = builder.build(new TiStore(store));
81        client.switchMode(mode);
82      }
83    }
84  }