1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.txn;
19
20 import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
21 import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnNotFound;
22
23 import com.google.protobuf.ByteString;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.locks.ReadWriteLock;
32 import java.util.concurrent.locks.ReentrantReadWriteLock;
33 import java.util.function.Supplier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.tikv.common.PDClient;
37 import org.tikv.common.TiConfiguration;
38 import org.tikv.common.exception.KeyException;
39 import org.tikv.common.exception.RegionException;
40 import org.tikv.common.exception.TiClientInternalException;
41 import org.tikv.common.operation.KVErrorHandler;
42 import org.tikv.common.region.AbstractRegionStoreClient;
43 import org.tikv.common.region.RegionManager;
44 import org.tikv.common.region.RegionStoreClient;
45 import org.tikv.common.region.TiRegion;
46 import org.tikv.common.region.TiRegion.RegionVerID;
47 import org.tikv.common.region.TiStore;
48 import org.tikv.common.util.BackOffer;
49 import org.tikv.common.util.ChannelFactory;
50 import org.tikv.common.util.TsoUtils;
51 import org.tikv.kvproto.Kvrpcpb;
52 import org.tikv.kvproto.TikvGrpc;
53 import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
54 import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
55 import org.tikv.txn.exception.TxnNotFoundException;
56 import org.tikv.txn.exception.WriteConflictException;
57
58
59 public class LockResolverClientV4 extends AbstractRegionStoreClient
60 implements AbstractLockResolverClient {
61 private static final Logger logger = LoggerFactory.getLogger(LockResolverClientV4.class);
62
63 private final ReadWriteLock readWriteLock;
64
65
66
67
68
69
70
71 private final Map<Long, TxnStatus> resolved;
72
73
74 private final Queue<Long> recentResolved;
75
76 private final PDClient pdClient;
77
78 private final RegionStoreClient.RegionStoreClientBuilder clientBuilder;
79
80 public LockResolverClientV4(
81 TiConfiguration conf,
82 TiRegion region,
83 TiStore store,
84 TikvBlockingStub blockingStub,
85 TikvFutureStub asyncStub,
86 ChannelFactory channelFactory,
87 RegionManager regionManager,
88 PDClient pdClient,
89 RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
90 super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
91 resolved = new HashMap<>();
92 recentResolved = new LinkedList<>();
93 readWriteLock = new ReentrantReadWriteLock();
94 this.pdClient = pdClient;
95 this.clientBuilder = clientBuilder;
96 }
97
98 @Override
99 public String getVersion() {
100 return "V4";
101 }
102
103 @Override
104 public ResolveLockResult resolveLocks(
105 BackOffer bo, long callerStartTS, List<Lock> locks, boolean forWrite) {
106 TxnExpireTime msBeforeTxnExpired = new TxnExpireTime();
107
108 if (locks.isEmpty()) {
109 return new ResolveLockResult(msBeforeTxnExpired.value());
110 }
111
112 Map<Long, Set<RegionVerID>> cleanTxns = new HashMap<>();
113 boolean pushFail = false;
114 Set<Long> pushed = new HashSet<>(locks.size());
115
116 for (Lock l : locks) {
117 TxnStatus status = getTxnStatusFromLock(bo, l, callerStartTS);
118
119 if (status.getTtl() == 0) {
120 Set<RegionVerID> cleanRegion =
121 cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());
122
123 if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
124 resolvePessimisticLock(bo, l, cleanRegion);
125 } else {
126 resolveLock(bo, l, status, cleanRegion);
127 }
128
129 } else {
130 long msBeforeLockExpired = TsoUtils.untilExpired(l.getTxnID(), status.getTtl());
131 msBeforeTxnExpired.update(msBeforeLockExpired);
132
133 if (forWrite) {
134
135
136
137
138 if (l.getLockType() != org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock
139 && l.getTxnID() > callerStartTS) {
140 throw new WriteConflictException(
141 callerStartTS, l.getTxnID(), status.getCommitTS(), l.getKey().toByteArray());
142 }
143 } else {
144 if (status.getAction() != org.tikv.kvproto.Kvrpcpb.Action.MinCommitTSPushed) {
145 pushFail = true;
146 } else {
147 pushed.add(l.getTxnID());
148 }
149 }
150 }
151 }
152
153 if (pushFail) {
154 pushed = new HashSet<>();
155 }
156
157 return new ResolveLockResult(msBeforeTxnExpired.value(), pushed);
158 }
159
160 private void resolvePessimisticLock(BackOffer bo, Lock lock, Set<RegionVerID> cleanRegion) {
161 while (true) {
162 region = regionManager.getRegionByKey(lock.getKey());
163
164 if (cleanRegion.contains(region.getVerID())) {
165 return;
166 }
167
168 final long forUpdateTS =
169 lock.getLockForUpdateTs() == 0L ? Long.MAX_VALUE : lock.getLockForUpdateTs();
170
171 Supplier<Kvrpcpb.PessimisticRollbackRequest> factory =
172 () ->
173 Kvrpcpb.PessimisticRollbackRequest.newBuilder()
174 .setContext(makeContext())
175 .addKeys(codec.encodeKey(lock.getKey()))
176 .setStartVersion(lock.getTxnID())
177 .setForUpdateTs(forUpdateTS)
178 .build();
179
180 KVErrorHandler<Kvrpcpb.PessimisticRollbackResponse> handler =
181 new KVErrorHandler<>(
182 regionManager,
183 this,
184 this,
185 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
186 resp -> resp.getErrorsCount() > 0 ? resp.getErrorsList().get(0) : null,
187 resolveLockResult -> null,
188 0L,
189 false);
190 Kvrpcpb.PessimisticRollbackResponse resp =
191 callWithRetry(bo, TikvGrpc.getKVPessimisticRollbackMethod(), factory, handler);
192
193 if (resp == null) {
194 logger.error("getKVPessimisticRollbackMethod failed without a cause");
195 regionManager.onRequestFail(region);
196 bo.doBackOff(
197 BoRegionMiss,
198 new TiClientInternalException("getKVPessimisticRollbackMethod failed without a cause"));
199 continue;
200 }
201
202 if (resp.hasRegionError()) {
203 bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
204 continue;
205 }
206
207 if (resp.getErrorsCount() > 0) {
208 logger.error(
209 String.format(
210 "unexpected resolveLock err: %s, lock: %s", resp.getErrorsList().get(0), lock));
211 throw new KeyException(resp.getErrorsList().get(0));
212 }
213 }
214 }
215
216 private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock, long callerStartTS) {
217 long currentTS;
218
219 if (lock.getTtl() == 0) {
220
221
222
223
224
225 currentTS = Long.MAX_VALUE;
226 } else {
227 currentTS = pdClient.getTimestamp(bo).getVersion();
228 }
229
230 boolean rollbackIfNotExist = false;
231 while (true) {
232 try {
233 return getTxnStatus(
234 bo, lock.getTxnID(), lock.getPrimary(), callerStartTS, currentTS, rollbackIfNotExist);
235 } catch (TxnNotFoundException e) {
236
237
238 logger.warn("getTxnStatus error!", e);
239
240
241
242
243
244 bo.doBackOff(BoTxnNotFound, e);
245 }
246
247 if (TsoUtils.untilExpired(lock.getTxnID(), lock.getTtl()) <= 0) {
248 logger.warn(
249 String.format(
250 "lock txn not found, lock has expired, CallerStartTs=%d lock str=%s",
251 callerStartTS, lock));
252 if (lock.getLockType() == Kvrpcpb.Op.PessimisticLock) {
253 return new TxnStatus();
254 }
255 rollbackIfNotExist = true;
256 } else {
257 if (lock.getLockType() == Kvrpcpb.Op.PessimisticLock) {
258 return new TxnStatus(lock.getTtl());
259 }
260 }
261 }
262 }
263
264
265
266
267
268 private TxnStatus getTxnStatus(
269 BackOffer bo,
270 Long txnID,
271 ByteString primary,
272 Long callerStartTS,
273 Long currentTS,
274 boolean rollbackIfNotExist) {
275 TxnStatus status = getResolved(txnID);
276 if (status != null) {
277 return status;
278 }
279
280
281
282
283
284
285
286
287
288 Supplier<Kvrpcpb.CheckTxnStatusRequest> factory =
289 () -> {
290 TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
291 return Kvrpcpb.CheckTxnStatusRequest.newBuilder()
292 .setContext(primaryKeyRegion.getLeaderContext())
293 .setPrimaryKey(codec.encodeKey(primary))
294 .setLockTs(txnID)
295 .setCallerStartTs(callerStartTS)
296 .setCurrentTs(currentTS)
297 .setRollbackIfNotExist(rollbackIfNotExist)
298 .build();
299 };
300
301 while (true) {
302 TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
303
304 RegionStoreClient primaryKeyRegionStoreClient = clientBuilder.build(primary);
305 KVErrorHandler<Kvrpcpb.CheckTxnStatusResponse> handler =
306 new KVErrorHandler<>(
307 regionManager,
308 primaryKeyRegionStoreClient,
309 primaryKeyRegionStoreClient.lockResolverClient,
310 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
311 resp -> resp.hasError() ? resp.getError() : null,
312 resolveLockResult -> null,
313 callerStartTS,
314 false);
315
316 Kvrpcpb.CheckTxnStatusResponse resp =
317 primaryKeyRegionStoreClient.callWithRetry(
318 bo, TikvGrpc.getKvCheckTxnStatusMethod(), factory, handler);
319
320 if (resp == null) {
321 logger.error("getKvCheckTxnStatusMethod failed without a cause");
322 regionManager.onRequestFail(primaryKeyRegion);
323 bo.doBackOff(
324 BoRegionMiss,
325 new TiClientInternalException("getKvCheckTxnStatusMethod failed without a cause"));
326 continue;
327 }
328
329 if (resp.hasRegionError()) {
330 bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
331 continue;
332 }
333
334 if (resp.hasError()) {
335 Kvrpcpb.KeyError keyError = resp.getError();
336
337 if (keyError.hasTxnNotFound()) {
338 throw new TxnNotFoundException();
339 }
340
341 logger.error(String.format("unexpected cleanup err: %s, tid: %d", keyError, txnID));
342 throw new KeyException(keyError);
343 }
344
345 if (resp.getLockTtl() != 0) {
346 status = new TxnStatus(resp.getLockTtl(), 0L, resp.getAction());
347 } else {
348 status = new TxnStatus(0L, resp.getCommitVersion(), resp.getAction());
349 saveResolved(txnID, status);
350 }
351
352 return status;
353 }
354 }
355
356 private void resolveLock(
357 BackOffer bo, Lock lock, TxnStatus txnStatus, Set<RegionVerID> cleanRegion) {
358 boolean cleanWholeRegion = lock.getTxnSize() >= BIG_TXN_THRESHOLD;
359
360 while (true) {
361 region = regionManager.getRegionByKey(lock.getKey());
362
363 if (cleanRegion.contains(region.getVerID())) {
364 return;
365 }
366
367 Kvrpcpb.ResolveLockRequest.Builder builder =
368 Kvrpcpb.ResolveLockRequest.newBuilder()
369 .setContext(makeContext())
370 .setStartVersion(lock.getTxnID());
371
372 if (txnStatus.isCommitted()) {
373
374 builder.setCommitVersion(txnStatus.getCommitTS());
375 }
376
377 if (lock.getTxnSize() < BIG_TXN_THRESHOLD) {
378
379
380 builder.addKeys(codec.encodeKey(lock.getKey()));
381 }
382
383 Supplier<Kvrpcpb.ResolveLockRequest> factory = builder::build;
384 KVErrorHandler<Kvrpcpb.ResolveLockResponse> handler =
385 new KVErrorHandler<>(
386 regionManager,
387 this,
388 this,
389 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
390 resp -> resp.hasError() ? resp.getError() : null,
391 resolveLockResult -> null,
392 0L,
393 false);
394 Kvrpcpb.ResolveLockResponse resp =
395 callWithRetry(bo, TikvGrpc.getKvResolveLockMethod(), factory, handler);
396
397 if (resp == null) {
398 logger.error("getKvResolveLockMethod failed without a cause");
399 regionManager.onRequestFail(region);
400 bo.doBackOff(
401 BoRegionMiss,
402 new TiClientInternalException("getKvResolveLockMethod failed without a cause"));
403 continue;
404 }
405
406 if (resp.hasRegionError()) {
407 bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
408 continue;
409 }
410
411 if (resp.hasError()) {
412 logger.error(
413 String.format("unexpected resolveLock err: %s, lock: %s", resp.getError(), lock));
414 throw new KeyException(resp.getError());
415 }
416
417 if (cleanWholeRegion) {
418 cleanRegion.add(region.getVerID());
419 }
420 return;
421 }
422 }
423
424 private void saveResolved(long txnID, TxnStatus status) {
425 try {
426 readWriteLock.writeLock().lock();
427 if (resolved.containsKey(txnID)) {
428 return;
429 }
430
431 resolved.put(txnID, status);
432 recentResolved.add(txnID);
433 if (recentResolved.size() > RESOLVED_TXN_CACHE_SIZE) {
434 Long front = recentResolved.remove();
435 resolved.remove(front);
436 }
437 } finally {
438 readWriteLock.writeLock().unlock();
439 }
440 }
441
442 private TxnStatus getResolved(Long txnID) {
443 try {
444 readWriteLock.readLock().lock();
445 return resolved.get(txnID);
446 } finally {
447 readWriteLock.readLock().unlock();
448 }
449 }
450 }