1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.txn;
19
20 import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
21
22 import com.google.protobuf.ByteString;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.locks.ReadWriteLock;
32 import java.util.concurrent.locks.ReentrantReadWriteLock;
33 import java.util.function.Supplier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.tikv.common.PDClient;
37 import org.tikv.common.TiConfiguration;
38 import org.tikv.common.exception.KeyException;
39 import org.tikv.common.exception.RegionException;
40 import org.tikv.common.exception.TiClientInternalException;
41 import org.tikv.common.operation.KVErrorHandler;
42 import org.tikv.common.region.*;
43 import org.tikv.common.region.TiRegion.RegionVerID;
44 import org.tikv.common.util.BackOffer;
45 import org.tikv.common.util.ChannelFactory;
46 import org.tikv.common.util.TsoUtils;
47 import org.tikv.kvproto.Kvrpcpb;
48 import org.tikv.kvproto.Kvrpcpb.CleanupRequest;
49 import org.tikv.kvproto.Kvrpcpb.CleanupResponse;
50 import org.tikv.kvproto.TikvGrpc;
51 import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
52 import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
53
54
55 public class LockResolverClientV3 extends AbstractRegionStoreClient
56 implements AbstractLockResolverClient {
57 private static final Logger logger = LoggerFactory.getLogger(LockResolverClientV3.class);
58
59 private final ReadWriteLock readWriteLock;
60
61
62
63
64
65
66
67 private final Map<Long, TxnStatus> resolved;
68
69
70 private final Queue<Long> recentResolved;
71
72 private final PDClient pdClient;
73
74 private final RegionStoreClient.RegionStoreClientBuilder clientBuilder;
75
76 public LockResolverClientV3(
77 TiConfiguration conf,
78 TiRegion region,
79 TiStore store,
80 TikvBlockingStub blockingStub,
81 TikvFutureStub asyncStub,
82 ChannelFactory channelFactory,
83 RegionManager regionManager,
84 PDClient pdClient,
85 RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
86 super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
87 resolved = new HashMap<>();
88 recentResolved = new LinkedList<>();
89 readWriteLock = new ReentrantReadWriteLock();
90 this.pdClient = pdClient;
91 this.clientBuilder = clientBuilder;
92 }
93
94 @Override
95 public String getVersion() {
96 return "V3";
97 }
98
99 @Override
100 public ResolveLockResult resolveLocks(
101 BackOffer bo, long callerStartTS, List<Lock> locks, boolean forWrite) {
102 TxnExpireTime msBeforeTxnExpired = new TxnExpireTime();
103
104 if (locks.isEmpty()) {
105 return new ResolveLockResult(msBeforeTxnExpired.value());
106 }
107
108 List<Lock> expiredLocks = new ArrayList<>();
109 for (Lock lock : locks) {
110 if (TsoUtils.isExpired(lock.getTxnID(), lock.getTtl())) {
111 expiredLocks.add(lock);
112 } else {
113 msBeforeTxnExpired.update(lock.getTtl());
114 }
115 }
116
117 if (expiredLocks.isEmpty()) {
118 return new ResolveLockResult(msBeforeTxnExpired.value());
119 }
120
121 Map<Long, Set<RegionVerID>> cleanTxns = new HashMap<>();
122 for (Lock l : expiredLocks) {
123 TxnStatus status = getTxnStatusFromLock(bo, l);
124
125 if (status.getTtl() == 0) {
126 Set<RegionVerID> cleanRegion =
127 cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());
128
129 resolveLock(bo, l, status, cleanRegion);
130 } else {
131 long msBeforeLockExpired = TsoUtils.untilExpired(l.getTxnID(), status.getTtl());
132 msBeforeTxnExpired.update(msBeforeLockExpired);
133 }
134 }
135
136 return new ResolveLockResult(msBeforeTxnExpired.value());
137 }
138
139 private void resolveLock(
140 BackOffer bo, Lock lock, TxnStatus txnStatus, Set<RegionVerID> cleanRegion) {
141 boolean cleanWholeRegion = lock.getTxnSize() >= BIG_TXN_THRESHOLD;
142
143 while (true) {
144 region = regionManager.getRegionByKey(lock.getKey());
145
146 if (cleanRegion.contains(region.getVerID())) {
147 return;
148 }
149
150 Kvrpcpb.ResolveLockRequest.Builder builder =
151 Kvrpcpb.ResolveLockRequest.newBuilder()
152 .setContext(region.getLeaderContext())
153 .setStartVersion(lock.getTxnID());
154
155 if (txnStatus.isCommitted()) {
156
157 builder.setCommitVersion(txnStatus.getCommitTS());
158 }
159
160 if (lock.getTxnSize() < BIG_TXN_THRESHOLD) {
161
162
163 builder.addKeys(lock.getKey());
164 }
165
166 Supplier<Kvrpcpb.ResolveLockRequest> factory = builder::build;
167 KVErrorHandler<Kvrpcpb.ResolveLockResponse> handler =
168 new KVErrorHandler<>(
169 regionManager,
170 this,
171 this,
172 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
173 resp -> resp.hasError() ? resp.getError() : null,
174 resolveLockResult -> null,
175 0L,
176 false);
177 Kvrpcpb.ResolveLockResponse resp =
178 callWithRetry(bo, TikvGrpc.getKvResolveLockMethod(), factory, handler);
179
180 if (resp == null) {
181 logger.error("getKvResolveLockMethod failed without a cause");
182 regionManager.onRequestFail(region);
183 bo.doBackOff(
184 BoRegionMiss,
185 new TiClientInternalException("getKvResolveLockMethod failed without a cause"));
186 continue;
187 }
188
189 if (resp.hasRegionError()) {
190 bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
191 continue;
192 }
193
194 if (resp.hasError()) {
195 logger.error(
196 String.format("unexpected resolveLock err: %s, lock: %s", resp.getError(), lock));
197 throw new KeyException(resp.getError());
198 }
199
200 if (cleanWholeRegion) {
201 cleanRegion.add(region.getVerID());
202 }
203 return;
204 }
205 }
206
207 private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock) {
208
209
210
211
212
213 if (lock.getTtl() == 0) {
214 return getTxnStatus(bo, lock.getTxnID(), lock.getPrimary(), 0L);
215 }
216
217 long currentTS = pdClient.getTimestamp(bo).getVersion();
218 return getTxnStatus(bo, lock.getTxnID(), lock.getPrimary(), currentTS);
219 }
220
221 private TxnStatus getTxnStatus(BackOffer bo, Long txnID, ByteString primary, Long currentTS) {
222 TxnStatus status = getResolved(txnID);
223 if (status != null) {
224 return status;
225 }
226
227 Supplier<CleanupRequest> factory =
228 () -> {
229 TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
230 return CleanupRequest.newBuilder()
231 .setContext(primaryKeyRegion.getLeaderContext())
232 .setKey(primary)
233 .setStartVersion(txnID)
234 .setCurrentTs(currentTS)
235 .build();
236 };
237
238 status = new TxnStatus();
239 while (true) {
240 TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
241
242 RegionStoreClient primaryKeyRegionStoreClient = clientBuilder.build(primary);
243 KVErrorHandler<CleanupResponse> handler =
244 new KVErrorHandler<>(
245 regionManager,
246 primaryKeyRegionStoreClient,
247 primaryKeyRegionStoreClient.lockResolverClient,
248 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
249 resp -> resp.hasError() ? resp.getError() : null,
250 resolveLockResult -> null,
251 0L,
252 false);
253
254 CleanupResponse resp =
255 primaryKeyRegionStoreClient.callWithRetry(
256 bo, TikvGrpc.getKvCleanupMethod(), factory, handler);
257
258 if (resp == null) {
259 logger.error("getKvCleanupMethod failed without a cause");
260 regionManager.onRequestFail(primaryKeyRegion);
261 bo.doBackOff(
262 BoRegionMiss,
263 new TiClientInternalException("getKvCleanupMethod failed without a cause"));
264 continue;
265 }
266
267 if (resp.hasRegionError()) {
268 bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
269 continue;
270 }
271
272 if (resp.hasError()) {
273 Kvrpcpb.KeyError keyError = resp.getError();
274
275
276
277 if (keyError.hasLocked()) {
278 Kvrpcpb.LockInfo lockInfo = keyError.getLocked();
279 return new TxnStatus(lockInfo.getLockTtl(), 0L);
280 }
281
282 logger.error(String.format("unexpected cleanup err: %s, tid: %d", keyError, txnID));
283 throw new KeyException(keyError);
284 }
285
286 if (resp.getCommitVersion() != 0) {
287 status = new TxnStatus(0L, resp.getCommitVersion());
288 }
289
290 saveResolved(txnID, status);
291 return status;
292 }
293 }
294
295 private void saveResolved(long txnID, TxnStatus status) {
296 try {
297 readWriteLock.writeLock().lock();
298 if (resolved.containsKey(txnID)) {
299 return;
300 }
301
302 resolved.put(txnID, status);
303 recentResolved.add(txnID);
304 if (recentResolved.size() > RESOLVED_TXN_CACHE_SIZE) {
305 Long front = recentResolved.remove();
306 resolved.remove(front);
307 }
308 } finally {
309 readWriteLock.writeLock().unlock();
310 }
311 }
312
313 private TxnStatus getResolved(Long txnID) {
314 try {
315 readWriteLock.readLock().lock();
316 return resolved.get(txnID);
317 } finally {
318 readWriteLock.readLock().unlock();
319 }
320 }
321 }