View Javadoc
1   /*
2    * Copyright 2017 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.txn;
19  
20  import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
21  
22  import com.google.protobuf.ByteString;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.locks.ReadWriteLock;
32  import java.util.concurrent.locks.ReentrantReadWriteLock;
33  import java.util.function.Supplier;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.tikv.common.PDClient;
37  import org.tikv.common.TiConfiguration;
38  import org.tikv.common.exception.KeyException;
39  import org.tikv.common.exception.RegionException;
40  import org.tikv.common.exception.TiClientInternalException;
41  import org.tikv.common.operation.KVErrorHandler;
42  import org.tikv.common.region.*;
43  import org.tikv.common.region.TiRegion.RegionVerID;
44  import org.tikv.common.util.BackOffer;
45  import org.tikv.common.util.ChannelFactory;
46  import org.tikv.common.util.TsoUtils;
47  import org.tikv.kvproto.Kvrpcpb;
48  import org.tikv.kvproto.Kvrpcpb.CleanupRequest;
49  import org.tikv.kvproto.Kvrpcpb.CleanupResponse;
50  import org.tikv.kvproto.TikvGrpc;
51  import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
52  import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
53  
54  /** Since v3.0.5 TiDB ignores the ttl on secondary lock and will use the ttl on primary key. */
55  public class LockResolverClientV3 extends AbstractRegionStoreClient
56      implements AbstractLockResolverClient {
57    private static final Logger logger = LoggerFactory.getLogger(LockResolverClientV3.class);
58  
59    private final ReadWriteLock readWriteLock;
60  
61    /**
62     * Note: Because the internal of long is same as unsigned_long and Txn id are never changed. Be
63     * careful to compare between two tso the `resolved` mapping is as {@code Map<TxnId, TxnStatus>}
64     * TxnStatus represents a txn's final status. It should be Commit or Rollback. if TxnStatus > 0,
65     * means the commit ts, otherwise abort
66     */
67    private final Map<Long, TxnStatus> resolved;
68  
69    /** the list is chain of txn for O(1) lru cache */
70    private final Queue<Long> recentResolved;
71  
72    private final PDClient pdClient;
73  
74    private final RegionStoreClient.RegionStoreClientBuilder clientBuilder;
75  
76    public LockResolverClientV3(
77        TiConfiguration conf,
78        TiRegion region,
79        TiStore store,
80        TikvBlockingStub blockingStub,
81        TikvFutureStub asyncStub,
82        ChannelFactory channelFactory,
83        RegionManager regionManager,
84        PDClient pdClient,
85        RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
86      super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
87      resolved = new HashMap<>();
88      recentResolved = new LinkedList<>();
89      readWriteLock = new ReentrantReadWriteLock();
90      this.pdClient = pdClient;
91      this.clientBuilder = clientBuilder;
92    }
93  
94    @Override
95    public String getVersion() {
96      return "V3";
97    }
98  
99    @Override
100   public ResolveLockResult resolveLocks(
101       BackOffer bo, long callerStartTS, List<Lock> locks, boolean forWrite) {
102     TxnExpireTime msBeforeTxnExpired = new TxnExpireTime();
103 
104     if (locks.isEmpty()) {
105       return new ResolveLockResult(msBeforeTxnExpired.value());
106     }
107 
108     List<Lock> expiredLocks = new ArrayList<>();
109     for (Lock lock : locks) {
110       if (TsoUtils.isExpired(lock.getTxnID(), lock.getTtl())) {
111         expiredLocks.add(lock);
112       } else {
113         msBeforeTxnExpired.update(lock.getTtl());
114       }
115     }
116 
117     if (expiredLocks.isEmpty()) {
118       return new ResolveLockResult(msBeforeTxnExpired.value());
119     }
120 
121     Map<Long, Set<RegionVerID>> cleanTxns = new HashMap<>();
122     for (Lock l : expiredLocks) {
123       TxnStatus status = getTxnStatusFromLock(bo, l);
124 
125       if (status.getTtl() == 0) {
126         Set<RegionVerID> cleanRegion =
127             cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());
128 
129         resolveLock(bo, l, status, cleanRegion);
130       } else {
131         long msBeforeLockExpired = TsoUtils.untilExpired(l.getTxnID(), status.getTtl());
132         msBeforeTxnExpired.update(msBeforeLockExpired);
133       }
134     }
135 
136     return new ResolveLockResult(msBeforeTxnExpired.value());
137   }
138 
139   private void resolveLock(
140       BackOffer bo, Lock lock, TxnStatus txnStatus, Set<RegionVerID> cleanRegion) {
141     boolean cleanWholeRegion = lock.getTxnSize() >= BIG_TXN_THRESHOLD;
142 
143     while (true) {
144       region = regionManager.getRegionByKey(lock.getKey());
145 
146       if (cleanRegion.contains(region.getVerID())) {
147         return;
148       }
149 
150       Kvrpcpb.ResolveLockRequest.Builder builder =
151           Kvrpcpb.ResolveLockRequest.newBuilder()
152               .setContext(region.getLeaderContext())
153               .setStartVersion(lock.getTxnID());
154 
155       if (txnStatus.isCommitted()) {
156         // txn is committed with commitTS txnStatus
157         builder.setCommitVersion(txnStatus.getCommitTS());
158       }
159 
160       if (lock.getTxnSize() < BIG_TXN_THRESHOLD) {
161         // Only resolve specified keys when it is a small transaction,
162         // prevent from scanning the whole region in this case.
163         builder.addKeys(lock.getKey());
164       }
165 
166       Supplier<Kvrpcpb.ResolveLockRequest> factory = builder::build;
167       KVErrorHandler<Kvrpcpb.ResolveLockResponse> handler =
168           new KVErrorHandler<>(
169               regionManager,
170               this,
171               this,
172               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
173               resp -> resp.hasError() ? resp.getError() : null,
174               resolveLockResult -> null,
175               0L,
176               false);
177       Kvrpcpb.ResolveLockResponse resp =
178           callWithRetry(bo, TikvGrpc.getKvResolveLockMethod(), factory, handler);
179 
180       if (resp == null) {
181         logger.error("getKvResolveLockMethod failed without a cause");
182         regionManager.onRequestFail(region);
183         bo.doBackOff(
184             BoRegionMiss,
185             new TiClientInternalException("getKvResolveLockMethod failed without a cause"));
186         continue;
187       }
188 
189       if (resp.hasRegionError()) {
190         bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
191         continue;
192       }
193 
194       if (resp.hasError()) {
195         logger.error(
196             String.format("unexpected resolveLock err: %s, lock: %s", resp.getError(), lock));
197         throw new KeyException(resp.getError());
198       }
199 
200       if (cleanWholeRegion) {
201         cleanRegion.add(region.getVerID());
202       }
203       return;
204     }
205   }
206 
207   private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock) {
208     // NOTE: l.TTL = 0 is a special protocol!!!
209     // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn
210     // **unconditionally**.
211     // In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call
212     // getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction.
213     if (lock.getTtl() == 0) {
214       return getTxnStatus(bo, lock.getTxnID(), lock.getPrimary(), 0L);
215     }
216 
217     long currentTS = pdClient.getTimestamp(bo).getVersion();
218     return getTxnStatus(bo, lock.getTxnID(), lock.getPrimary(), currentTS);
219   }
220 
221   private TxnStatus getTxnStatus(BackOffer bo, Long txnID, ByteString primary, Long currentTS) {
222     TxnStatus status = getResolved(txnID);
223     if (status != null) {
224       return status;
225     }
226 
227     Supplier<CleanupRequest> factory =
228         () -> {
229           TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
230           return CleanupRequest.newBuilder()
231               .setContext(primaryKeyRegion.getLeaderContext())
232               .setKey(primary)
233               .setStartVersion(txnID)
234               .setCurrentTs(currentTS)
235               .build();
236         };
237 
238     status = new TxnStatus();
239     while (true) {
240       TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary);
241       // new RegionStoreClient for PrimaryKey
242       RegionStoreClient primaryKeyRegionStoreClient = clientBuilder.build(primary);
243       KVErrorHandler<CleanupResponse> handler =
244           new KVErrorHandler<>(
245               regionManager,
246               primaryKeyRegionStoreClient,
247               primaryKeyRegionStoreClient.lockResolverClient,
248               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
249               resp -> resp.hasError() ? resp.getError() : null,
250               resolveLockResult -> null,
251               0L,
252               false);
253 
254       CleanupResponse resp =
255           primaryKeyRegionStoreClient.callWithRetry(
256               bo, TikvGrpc.getKvCleanupMethod(), factory, handler);
257 
258       if (resp == null) {
259         logger.error("getKvCleanupMethod failed without a cause");
260         regionManager.onRequestFail(primaryKeyRegion);
261         bo.doBackOff(
262             BoRegionMiss,
263             new TiClientInternalException("getKvCleanupMethod failed without a cause"));
264         continue;
265       }
266 
267       if (resp.hasRegionError()) {
268         bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
269         continue;
270       }
271 
272       if (resp.hasError()) {
273         Kvrpcpb.KeyError keyError = resp.getError();
274 
275         // If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains
276         // the TTL.
277         if (keyError.hasLocked()) {
278           Kvrpcpb.LockInfo lockInfo = keyError.getLocked();
279           return new TxnStatus(lockInfo.getLockTtl(), 0L);
280         }
281 
282         logger.error(String.format("unexpected cleanup err: %s, tid: %d", keyError, txnID));
283         throw new KeyException(keyError);
284       }
285 
286       if (resp.getCommitVersion() != 0) {
287         status = new TxnStatus(0L, resp.getCommitVersion());
288       }
289 
290       saveResolved(txnID, status);
291       return status;
292     }
293   }
294 
295   private void saveResolved(long txnID, TxnStatus status) {
296     try {
297       readWriteLock.writeLock().lock();
298       if (resolved.containsKey(txnID)) {
299         return;
300       }
301 
302       resolved.put(txnID, status);
303       recentResolved.add(txnID);
304       if (recentResolved.size() > RESOLVED_TXN_CACHE_SIZE) {
305         Long front = recentResolved.remove();
306         resolved.remove(front);
307       }
308     } finally {
309       readWriteLock.writeLock().unlock();
310     }
311   }
312 
313   private TxnStatus getResolved(Long txnID) {
314     try {
315       readWriteLock.readLock().lock();
316       return resolved.get(txnID);
317     } finally {
318       readWriteLock.readLock().unlock();
319     }
320   }
321 }