View Javadoc
1   /*
2    * Copyright 2020 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.txn;
19  
20  import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
21  import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnNotFound;
22  
23  import com.google.protobuf.ByteString;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.locks.ReadWriteLock;
32  import java.util.concurrent.locks.ReentrantReadWriteLock;
33  import java.util.function.Supplier;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.tikv.common.PDClient;
37  import org.tikv.common.TiConfiguration;
38  import org.tikv.common.exception.KeyException;
39  import org.tikv.common.exception.RegionException;
40  import org.tikv.common.exception.TiClientInternalException;
41  import org.tikv.common.operation.KVErrorHandler;
42  import org.tikv.common.region.AbstractRegionStoreClient;
43  import org.tikv.common.region.RegionManager;
44  import org.tikv.common.region.RegionStoreClient;
45  import org.tikv.common.region.TiRegion;
46  import org.tikv.common.region.TiRegion.RegionVerID;
47  import org.tikv.common.region.TiStore;
48  import org.tikv.common.util.BackOffer;
49  import org.tikv.common.util.ChannelFactory;
50  import org.tikv.common.util.TsoUtils;
51  import org.tikv.kvproto.Kvrpcpb;
52  import org.tikv.kvproto.TikvGrpc;
53  import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
54  import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
55  import org.tikv.txn.exception.TxnNotFoundException;
56  import org.tikv.txn.exception.WriteConflictException;
57  
58  /** Since v4.0.0 TiDB write will not block read (update MinCommitTS). */
59  public class LockResolverClientV4 extends AbstractRegionStoreClient
60      implements AbstractLockResolverClient {
61    private static final Logger logger = LoggerFactory.getLogger(LockResolverClientV4.class);
62  
63    private final ReadWriteLock readWriteLock;
64  
65    /**
66     * Note: Because the internal of long is same as unsigned_long and Txn id are never changed. Be
67     * careful to compare between two tso the `resolved` mapping is as {@code Map<TxnId, TxnStatus>}
68     * TxnStatus represents a txn's final status. It should be Commit or Rollback. if TxnStatus > 0,
69     * means the commit ts, otherwise abort
70     */
71    private final Map<Long, TxnStatus> resolved;
72  
73    /** the list is chain of txn for O(1) lru cache */
74    private final Queue<Long> recentResolved;
75  
76    private final PDClient pdClient;
77  
78    private final RegionStoreClient.RegionStoreClientBuilder clientBuilder;
79  
80    public LockResolverClientV4(
81        TiConfiguration conf,
82        TiRegion region,
83        TiStore store,
84        TikvBlockingStub blockingStub,
85        TikvFutureStub asyncStub,
86        ChannelFactory channelFactory,
87        RegionManager regionManager,
88        PDClient pdClient,
89        RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
90      super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
91      resolved = new HashMap<>();
92      recentResolved = new LinkedList<>();
93      readWriteLock = new ReentrantReadWriteLock();
94      this.pdClient = pdClient;
95      this.clientBuilder = clientBuilder;
96    }
97  
98    @Override
99    public String getVersion() {
100     return "V4";
101   }
102 
103   @Override
104   public ResolveLockResult resolveLocks(
105       BackOffer bo, long callerStartTS, List<Lock> locks, boolean forWrite) {
106     TxnExpireTime msBeforeTxnExpired = new TxnExpireTime();
107 
108     if (locks.isEmpty()) {
109       return new ResolveLockResult(msBeforeTxnExpired.value());
110     }
111 
112     Map<Long, Set<RegionVerID>> cleanTxns = new HashMap<>();
113     boolean pushFail = false;
114     Set<Long> pushed = new HashSet<>(locks.size());
115 
116     for (Lock l : locks) {
117       TxnStatus status = getTxnStatusFromLock(bo, l, callerStartTS);
118 
119       if (status.getTtl() == 0) {
120         Set<RegionVerID> cleanRegion =
121             cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());
122 
123         if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
124           resolvePessimisticLock(bo, l, cleanRegion);
125         } else {
126           resolveLock(bo, l, status, cleanRegion);
127         }
128 
129       } else {
130         long msBeforeLockExpired = TsoUtils.untilExpired(l.getTxnID(), status.getTtl());
131         msBeforeTxnExpired.update(msBeforeLockExpired);
132 
133         if (forWrite) {
134           // Write conflict detected!
135           // If it's a optimistic conflict and current txn is earlier than the lock owner,
136           // abort current transaction.
137           // This could avoids the deadlock scene of two large transaction.
138           if (l.getLockType() != org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock
139               && l.getTxnID() > callerStartTS) {
140             throw new WriteConflictException(
141                 callerStartTS, l.getTxnID(), status.getCommitTS(), l.getKey().toByteArray());
142           }
143         } else {
144           if (status.getAction() != org.tikv.kvproto.Kvrpcpb.Action.MinCommitTSPushed) {
145             pushFail = true;
146           } else {
147             pushed.add(l.getTxnID());
148           }
149         }
150       }
151     }
152 
153     if (pushFail) {
154       pushed = new HashSet<>();
155     }
156 
157     return new ResolveLockResult(msBeforeTxnExpired.value(), pushed);
158   }
159 
160   private void resolvePessimisticLock(BackOffer bo, Lock lock, Set<RegionVerID> cleanRegion) {
161     while (true) {
162       region = regionManager.getRegionByKey(lock.getKey());
163 
164       if (cleanRegion.contains(region.getVerID())) {
165         return;
166       }
167 
168       final long forUpdateTS =
169           lock.getLockForUpdateTs() == 0L ? Long.MAX_VALUE : lock.getLockForUpdateTs();
170 
171       Supplier<Kvrpcpb.PessimisticRollbackRequest> factory =
172           () ->
173               Kvrpcpb.PessimisticRollbackRequest.newBuilder()
174                   .setContext(makeContext())
175                   .addKeys(codec.encodeKey(lock.getKey()))
176                   .setStartVersion(lock.getTxnID())
177                   .setForUpdateTs(forUpdateTS)
178                   .build();
179 
180       KVErrorHandler<Kvrpcpb.PessimisticRollbackResponse> handler =
181           new KVErrorHandler<>(
182               regionManager,
183               this,
184               this,
185               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
186               resp -> resp.getErrorsCount() > 0 ? resp.getErrorsList().get(0) : null,
187               resolveLockResult -> null,
188               0L,
189               false);
190       Kvrpcpb.PessimisticRollbackResponse resp =
191           callWithRetry(bo, TikvGrpc.getKVPessimisticRollbackMethod(), factory, handler);
192 
193       if (resp == null) {
194         logger.error("getKVPessimisticRollbackMethod failed without a cause");
195         regionManager.onRequestFail(region);
196         bo.doBackOff(
197             BoRegionMiss,
198             new TiClientInternalException("getKVPessimisticRollbackMethod failed without a cause"));
199         continue;
200       }
201 
202       if (resp.hasRegionError()) {
203         bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
204         continue;
205       }
206 
207       if (resp.getErrorsCount() > 0) {
208         logger.error(
209             String.format(
210                 "unexpected resolveLock err: %s, lock: %s", resp.getErrorsList().get(0), lock));
211         throw new KeyException(resp.getErrorsList().get(0));
212       }
213     }
214   }
215 
216   private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock, long callerStartTS) {
217     long currentTS;
218 
219     if (lock.getTtl() == 0) {
220       // NOTE: l.TTL = 0 is a special protocol!!!
221       // When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock
222       // **unconditionally**.
223       // In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!
224       // Set currentTS to max uint64 to make the lock expired.
225       currentTS = Long.MAX_VALUE;
226     } else {
227       currentTS = pdClient.getTimestamp(bo).getVersion();
228     }
229 
230     boolean rollbackIfNotExist = false;
231     while (true) {
232       try {
233         return getTxnStatus(
234             bo, lock.getTxnID(), lock.getPrimary(), callerStartTS, currentTS, rollbackIfNotExist);
235       } catch (TxnNotFoundException e) {
236         // If the error is something other than txnNotFoundErr, throw the error (network
237         // unavailable, tikv down, backoff timeout etc) to the caller.
238         logger.warn("getTxnStatus error!", e);
239 
240         // Handle txnNotFound error.
241         // getTxnStatus() returns it when the secondary locks exist while the primary lock doesn't.
242         // This is likely to happen in the concurrently prewrite when secondary regions
243         // success before the primary region.
244         bo.doBackOff(BoTxnNotFound, e);
245       }
246 
247       if (TsoUtils.untilExpired(lock.getTxnID(), lock.getTtl()) <= 0) {
248         logger.warn(
249             String.format(
250                 "lock txn not found, lock has expired, CallerStartTs=%d lock str=%s",
251                 callerStartTS, lock));
252         if (lock.getLockType() == Kvrpcpb.Op.PessimisticLock) {
253           return new TxnStatus();
254         }
255         rollbackIfNotExist = true;
256       } else {
257         if (lock.getLockType() == Kvrpcpb.Op.PessimisticLock) {
258           return new TxnStatus(lock.getTtl());
259         }
260       }
261     }
262   }
263 
264   /**
265    * getTxnStatus sends the CheckTxnStatus request to the TiKV server. When rollbackIfNotExist is
266    * false, the caller should be careful with the TxnNotFoundException error.
267    */
268   private TxnStatus getTxnStatus(
269       BackOffer bo,
270       Long txnID,
271       ByteString primary,
272       Long callerStartTS,
273       Long currentTS,
274       boolean rollbackIfNotExist) {
275     TxnStatus status = getResolved(txnID);
276     if (status != null) {
277       return status;
278     }
279 
280     // CheckTxnStatus may meet the following cases:
281     // 1. LOCK
282     // 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc.
283     // 1.2 Lock TTL -- active transaction holding the lock.
284     // 2. NO LOCK
285     // 2.1 Txn Committed
286     // 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc.
287     // 2.3 No lock -- pessimistic lock rollback, concurrence prewrite.
288     Supplier<Kvrpcpb.CheckTxnStatusRequest> factory =
289         () -> {
290           TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
291           return Kvrpcpb.CheckTxnStatusRequest.newBuilder()
292               .setContext(primaryKeyRegion.getLeaderContext())
293               .setPrimaryKey(codec.encodeKey(primary))
294               .setLockTs(txnID)
295               .setCallerStartTs(callerStartTS)
296               .setCurrentTs(currentTS)
297               .setRollbackIfNotExist(rollbackIfNotExist)
298               .build();
299         };
300 
301     while (true) {
302       TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
303       // new RegionStoreClient for PrimaryKey
304       RegionStoreClient primaryKeyRegionStoreClient = clientBuilder.build(primary);
305       KVErrorHandler<Kvrpcpb.CheckTxnStatusResponse> handler =
306           new KVErrorHandler<>(
307               regionManager,
308               primaryKeyRegionStoreClient,
309               primaryKeyRegionStoreClient.lockResolverClient,
310               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
311               resp -> resp.hasError() ? resp.getError() : null,
312               resolveLockResult -> null,
313               callerStartTS,
314               false);
315 
316       Kvrpcpb.CheckTxnStatusResponse resp =
317           primaryKeyRegionStoreClient.callWithRetry(
318               bo, TikvGrpc.getKvCheckTxnStatusMethod(), factory, handler);
319 
320       if (resp == null) {
321         logger.error("getKvCheckTxnStatusMethod failed without a cause");
322         regionManager.onRequestFail(primaryKeyRegion);
323         bo.doBackOff(
324             BoRegionMiss,
325             new TiClientInternalException("getKvCheckTxnStatusMethod failed without a cause"));
326         continue;
327       }
328 
329       if (resp.hasRegionError()) {
330         bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
331         continue;
332       }
333 
334       if (resp.hasError()) {
335         Kvrpcpb.KeyError keyError = resp.getError();
336 
337         if (keyError.hasTxnNotFound()) {
338           throw new TxnNotFoundException();
339         }
340 
341         logger.error(String.format("unexpected cleanup err: %s, tid: %d", keyError, txnID));
342         throw new KeyException(keyError);
343       }
344 
345       if (resp.getLockTtl() != 0) {
346         status = new TxnStatus(resp.getLockTtl(), 0L, resp.getAction());
347       } else {
348         status = new TxnStatus(0L, resp.getCommitVersion(), resp.getAction());
349         saveResolved(txnID, status);
350       }
351 
352       return status;
353     }
354   }
355 
356   private void resolveLock(
357       BackOffer bo, Lock lock, TxnStatus txnStatus, Set<RegionVerID> cleanRegion) {
358     boolean cleanWholeRegion = lock.getTxnSize() >= BIG_TXN_THRESHOLD;
359 
360     while (true) {
361       region = regionManager.getRegionByKey(lock.getKey());
362 
363       if (cleanRegion.contains(region.getVerID())) {
364         return;
365       }
366 
367       Kvrpcpb.ResolveLockRequest.Builder builder =
368           Kvrpcpb.ResolveLockRequest.newBuilder()
369               .setContext(makeContext())
370               .setStartVersion(lock.getTxnID());
371 
372       if (txnStatus.isCommitted()) {
373         // txn is committed with commitTS txnStatus
374         builder.setCommitVersion(txnStatus.getCommitTS());
375       }
376 
377       if (lock.getTxnSize() < BIG_TXN_THRESHOLD) {
378         // Only resolve specified keys when it is a small transaction,
379         // prevent from scanning the whole region in this case.
380         builder.addKeys(codec.encodeKey(lock.getKey()));
381       }
382 
383       Supplier<Kvrpcpb.ResolveLockRequest> factory = builder::build;
384       KVErrorHandler<Kvrpcpb.ResolveLockResponse> handler =
385           new KVErrorHandler<>(
386               regionManager,
387               this,
388               this,
389               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
390               resp -> resp.hasError() ? resp.getError() : null,
391               resolveLockResult -> null,
392               0L,
393               false);
394       Kvrpcpb.ResolveLockResponse resp =
395           callWithRetry(bo, TikvGrpc.getKvResolveLockMethod(), factory, handler);
396 
397       if (resp == null) {
398         logger.error("getKvResolveLockMethod failed without a cause");
399         regionManager.onRequestFail(region);
400         bo.doBackOff(
401             BoRegionMiss,
402             new TiClientInternalException("getKvResolveLockMethod failed without a cause"));
403         continue;
404       }
405 
406       if (resp.hasRegionError()) {
407         bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
408         continue;
409       }
410 
411       if (resp.hasError()) {
412         logger.error(
413             String.format("unexpected resolveLock err: %s, lock: %s", resp.getError(), lock));
414         throw new KeyException(resp.getError());
415       }
416 
417       if (cleanWholeRegion) {
418         cleanRegion.add(region.getVerID());
419       }
420       return;
421     }
422   }
423 
424   private void saveResolved(long txnID, TxnStatus status) {
425     try {
426       readWriteLock.writeLock().lock();
427       if (resolved.containsKey(txnID)) {
428         return;
429       }
430 
431       resolved.put(txnID, status);
432       recentResolved.add(txnID);
433       if (recentResolved.size() > RESOLVED_TXN_CACHE_SIZE) {
434         Long front = recentResolved.remove();
435         resolved.remove(front);
436       }
437     } finally {
438       readWriteLock.writeLock().unlock();
439     }
440   }
441 
442   private TxnStatus getResolved(Long txnID) {
443     try {
444       readWriteLock.readLock().lock();
445       return resolved.get(txnID);
446     } finally {
447       readWriteLock.readLock().unlock();
448     }
449   }
450 }