1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.tikv.common.region;
18
19 import io.grpc.ManagedChannel;
20 import io.grpc.health.v1.HealthCheckRequest;
21 import io.grpc.health.v1.HealthCheckResponse;
22 import io.grpc.health.v1.HealthGrpc;
23 import io.grpc.stub.ClientCalls;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Supplier;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.tikv.common.ReadOnlyPDClient;
33 import org.tikv.common.util.ChannelFactory;
34 import org.tikv.common.util.ConcreteBackOffer;
35 import org.tikv.kvproto.Metapb;
36 import org.tikv.kvproto.Mpp;
37 import org.tikv.kvproto.Mpp.IsAliveRequest;
38 import org.tikv.kvproto.TikvGrpc;
39
40 public class StoreHealthyChecker implements Runnable {
41 private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class);
42 private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60;
43 private final BlockingQueue<TiStore> taskQueue;
44 private final ChannelFactory channelFactory;
45 private final ReadOnlyPDClient pdClient;
46 private final RegionCache cache;
47 private long checkTombstoneTick;
48 private final long timeout;
49
50 public StoreHealthyChecker(
51 ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) {
52 this.taskQueue = new LinkedBlockingQueue<>();
53 this.channelFactory = channelFactory;
54 this.pdClient = pdClient;
55 this.cache = cache;
56 this.checkTombstoneTick = 0;
57 this.timeout = timeout;
58 }
59
60 public boolean scheduleStoreHealthCheck(TiStore store) {
61
62 return this.taskQueue.add(store);
63 }
64
65 private List<TiStore> getValidStores() {
66 List<TiStore> unhealthStore = new LinkedList<>();
67 while (!this.taskQueue.isEmpty()) {
68 try {
69 TiStore store = this.taskQueue.take();
70 if (!store.isValid()) {
71 continue;
72 }
73 unhealthStore.add(store);
74 } catch (Exception e) {
75 return unhealthStore;
76 }
77 }
78 return unhealthStore;
79 }
80
81 private boolean checkStoreHealth(TiStore store) {
82 String addressStr = store.getStore().getAddress();
83 if (store.isTiFlash()) {
84 return checkTiFlashHealth(addressStr);
85 }
86 return checkTiKVHealth(addressStr);
87 }
88
89 private boolean checkTiFlashHealth(String addressStr) {
90 try {
91 ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
92 TikvGrpc.TikvBlockingStub stub =
93 TikvGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
94 Supplier<IsAliveRequest> factory = () -> Mpp.IsAliveRequest.newBuilder().build();
95 Mpp.IsAliveResponse resp =
96 ClientCalls.blockingUnaryCall(
97 stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get());
98 return resp != null && resp.getAvailable();
99 } catch (Exception e) {
100 logger.info(
101 "fail to check TiFlash health, regard as unhealthy. TiFlash address: " + addressStr, e);
102 return false;
103 }
104 }
105
106 private boolean checkTiKVHealth(String addressStr) {
107 try {
108 ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
109 HealthGrpc.HealthBlockingStub stub =
110 HealthGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
111 HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
112 HealthCheckResponse resp = stub.check(req);
113 return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
114 } catch (Exception e) {
115 logger.info("fail to check TiKV health, regard as unhealthy. TiKV address: " + addressStr, e);
116 return false;
117 }
118 }
119
120 private boolean checkStoreTombstone(TiStore store) {
121 try {
122 Metapb.Store newStore =
123 pdClient.getStore(
124 ConcreteBackOffer.newRawKVBackOff(pdClient.getClusterId()), store.getId());
125 if (newStore != null && newStore.getState() == Metapb.StoreState.Tombstone) {
126 return true;
127 }
128 } catch (Exception e) {
129 logger.info("fail to check tombstone stores", e);
130 return false;
131 }
132 return false;
133 }
134
135 @Override
136 public void run() {
137 checkTombstoneTick += 1;
138 boolean needCheckTombstoneStore = false;
139 if (checkTombstoneTick >= MAX_CHECK_STORE_TOMBSTONE_TICK) {
140 needCheckTombstoneStore = true;
141 checkTombstoneTick = 0;
142 }
143 List<TiStore> allStores = getValidStores();
144 List<TiStore> unreachableStore = new LinkedList<>();
145 for (TiStore store : allStores) {
146 if (needCheckTombstoneStore) {
147 if (checkStoreTombstone(store)) {
148 continue;
149 }
150 }
151
152 if (checkStoreHealth(store)) {
153 if (store.getProxyStore() != null) {
154 TiStore newStore = store.withProxy(null);
155 logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress()));
156 if (cache.putStore(newStore.getId(), newStore)) {
157 this.taskQueue.add(newStore);
158 continue;
159 }
160 } else {
161 if (!store.isReachable()) {
162 logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress()));
163 store.markReachable();
164 }
165 }
166 } else if (store.isReachable()) {
167 unreachableStore.add(store);
168 continue;
169 }
170 this.taskQueue.add(store);
171 }
172 if (!unreachableStore.isEmpty()) {
173 try {
174 Thread.sleep(timeout);
175 } catch (Exception e) {
176 this.taskQueue.addAll(unreachableStore);
177 return;
178 }
179 for (TiStore store : unreachableStore) {
180 if (!checkStoreHealth(store)) {
181 logger.warn(String.format("store [%s] is not reachable", store.getAddress()));
182 store.markUnreachable();
183 }
184 this.taskQueue.add(store);
185 }
186 }
187 }
188 }