View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  package org.tikv.common.region;
18  
19  import io.grpc.ManagedChannel;
20  import io.grpc.health.v1.HealthCheckRequest;
21  import io.grpc.health.v1.HealthCheckResponse;
22  import io.grpc.health.v1.HealthGrpc;
23  import io.grpc.stub.ClientCalls;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.TimeUnit;
29  import java.util.function.Supplier;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  import org.tikv.common.ReadOnlyPDClient;
33  import org.tikv.common.util.ChannelFactory;
34  import org.tikv.common.util.ConcreteBackOffer;
35  import org.tikv.kvproto.Metapb;
36  import org.tikv.kvproto.Mpp;
37  import org.tikv.kvproto.Mpp.IsAliveRequest;
38  import org.tikv.kvproto.TikvGrpc;
39  
40  public class StoreHealthyChecker implements Runnable {
41    private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class);
42    private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60;
43    private final BlockingQueue<TiStore> taskQueue;
44    private final ChannelFactory channelFactory;
45    private final ReadOnlyPDClient pdClient;
46    private final RegionCache cache;
47    private long checkTombstoneTick;
48    private final long timeout;
49  
50    public StoreHealthyChecker(
51        ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) {
52      this.taskQueue = new LinkedBlockingQueue<>();
53      this.channelFactory = channelFactory;
54      this.pdClient = pdClient;
55      this.cache = cache;
56      this.checkTombstoneTick = 0;
57      this.timeout = timeout;
58    }
59  
60    public boolean scheduleStoreHealthCheck(TiStore store) {
61      // add queue false, mark it reachable so that it can be put again.
62      return this.taskQueue.add(store);
63    }
64  
65    private List<TiStore> getValidStores() {
66      List<TiStore> unhealthStore = new LinkedList<>();
67      while (!this.taskQueue.isEmpty()) {
68        try {
69          TiStore store = this.taskQueue.take();
70          if (!store.isValid()) {
71            continue;
72          }
73          unhealthStore.add(store);
74        } catch (Exception e) {
75          return unhealthStore;
76        }
77      }
78      return unhealthStore;
79    }
80  
81    private boolean checkStoreHealth(TiStore store) {
82      String addressStr = store.getStore().getAddress();
83      if (store.isTiFlash()) {
84        return checkTiFlashHealth(addressStr);
85      }
86      return checkTiKVHealth(addressStr);
87    }
88  
89    private boolean checkTiFlashHealth(String addressStr) {
90      try {
91        ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
92        TikvGrpc.TikvBlockingStub stub =
93            TikvGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
94        Supplier<IsAliveRequest> factory = () -> Mpp.IsAliveRequest.newBuilder().build();
95        Mpp.IsAliveResponse resp =
96            ClientCalls.blockingUnaryCall(
97                stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get());
98        return resp != null && resp.getAvailable();
99      } catch (Exception e) {
100       logger.info(
101           "fail to check TiFlash health, regard as unhealthy. TiFlash address: " + addressStr, e);
102       return false;
103     }
104   }
105 
106   private boolean checkTiKVHealth(String addressStr) {
107     try {
108       ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
109       HealthGrpc.HealthBlockingStub stub =
110           HealthGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
111       HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
112       HealthCheckResponse resp = stub.check(req);
113       return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
114     } catch (Exception e) {
115       logger.info("fail to check TiKV health, regard as unhealthy. TiKV address: " + addressStr, e);
116       return false;
117     }
118   }
119 
120   private boolean checkStoreTombstone(TiStore store) {
121     try {
122       Metapb.Store newStore =
123           pdClient.getStore(
124               ConcreteBackOffer.newRawKVBackOff(pdClient.getClusterId()), store.getId());
125       if (newStore != null && newStore.getState() == Metapb.StoreState.Tombstone) {
126         return true;
127       }
128     } catch (Exception e) {
129       logger.info("fail to check tombstone stores", e);
130       return false;
131     }
132     return false;
133   }
134 
135   @Override
136   public void run() {
137     checkTombstoneTick += 1;
138     boolean needCheckTombstoneStore = false;
139     if (checkTombstoneTick >= MAX_CHECK_STORE_TOMBSTONE_TICK) {
140       needCheckTombstoneStore = true;
141       checkTombstoneTick = 0;
142     }
143     List<TiStore> allStores = getValidStores();
144     List<TiStore> unreachableStore = new LinkedList<>();
145     for (TiStore store : allStores) {
146       if (needCheckTombstoneStore) {
147         if (checkStoreTombstone(store)) {
148           continue;
149         }
150       }
151 
152       if (checkStoreHealth(store)) {
153         if (store.getProxyStore() != null) {
154           TiStore newStore = store.withProxy(null);
155           logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress()));
156           if (cache.putStore(newStore.getId(), newStore)) {
157             this.taskQueue.add(newStore);
158             continue;
159           }
160         } else {
161           if (!store.isReachable()) {
162             logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress()));
163             store.markReachable();
164           }
165         }
166       } else if (store.isReachable()) {
167         unreachableStore.add(store);
168         continue;
169       }
170       this.taskQueue.add(store);
171     }
172     if (!unreachableStore.isEmpty()) {
173       try {
174         Thread.sleep(timeout);
175       } catch (Exception e) {
176         this.taskQueue.addAll(unreachableStore);
177         return;
178       }
179       for (TiStore store : unreachableStore) {
180         if (!checkStoreHealth(store)) {
181           logger.warn(String.format("store [%s] is not reachable", store.getAddress()));
182           store.markUnreachable();
183         }
184         this.taskQueue.add(store);
185       }
186     }
187   }
188 }