View Javadoc
1   /*
2    * Copyright 2017 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.txn;
19  
20  import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
21  
22  import com.google.protobuf.ByteString;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.locks.ReadWriteLock;
32  import java.util.concurrent.locks.ReentrantReadWriteLock;
33  import java.util.function.Supplier;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.tikv.common.TiConfiguration;
37  import org.tikv.common.exception.KeyException;
38  import org.tikv.common.exception.RegionException;
39  import org.tikv.common.exception.TiClientInternalException;
40  import org.tikv.common.operation.KVErrorHandler;
41  import org.tikv.common.region.AbstractRegionStoreClient;
42  import org.tikv.common.region.RegionManager;
43  import org.tikv.common.region.TiRegion;
44  import org.tikv.common.region.TiRegion.RegionVerID;
45  import org.tikv.common.region.TiStore;
46  import org.tikv.common.util.BackOffer;
47  import org.tikv.common.util.ChannelFactory;
48  import org.tikv.common.util.TsoUtils;
49  import org.tikv.kvproto.Kvrpcpb.CleanupRequest;
50  import org.tikv.kvproto.Kvrpcpb.CleanupResponse;
51  import org.tikv.kvproto.Kvrpcpb.ResolveLockRequest;
52  import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse;
53  import org.tikv.kvproto.TikvGrpc;
54  import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
55  import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
56  
57  /** Before v3.0.5 TiDB uses the ttl on secondary lock. */
58  public class LockResolverClientV2 extends AbstractRegionStoreClient
59      implements AbstractLockResolverClient {
60    private static final Logger logger = LoggerFactory.getLogger(LockResolverClientV2.class);
61  
62    private final ReadWriteLock readWriteLock;
63  
64    /**
65     * Note: Because the internal of long is same as unsigned_long and Txn id are never changed. Be
66     * careful to compare between two tso the `resolved` mapping is as {@code Map<TxnId, TxnStatus>}
67     * TxnStatus represents a txn's final status. It should be Commit or Rollback. if TxnStatus > 0,
68     * means the commit ts, otherwise abort
69     */
70    private final Map<Long, Long> resolved;
71  
72    /** the list is chain of txn for O(1) lru cache */
73    private final Queue<Long> recentResolved;
74  
75    public LockResolverClientV2(
76        TiConfiguration conf,
77        TiRegion region,
78        TiStore store,
79        TikvBlockingStub blockingStub,
80        TikvFutureStub asyncStub,
81        ChannelFactory channelFactory,
82        RegionManager regionManager) {
83      super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
84      resolved = new HashMap<>();
85      recentResolved = new LinkedList<>();
86      readWriteLock = new ReentrantReadWriteLock();
87    }
88  
89    private void saveResolved(long txnID, long status) {
90      try {
91        readWriteLock.writeLock().lock();
92        if (resolved.containsKey(txnID)) {
93          return;
94        }
95  
96        resolved.put(txnID, status);
97        recentResolved.add(txnID);
98        if (recentResolved.size() > RESOLVED_TXN_CACHE_SIZE) {
99          Long front = recentResolved.remove();
100         resolved.remove(front);
101       }
102     } finally {
103       readWriteLock.writeLock().unlock();
104     }
105   }
106 
107   private Long getResolved(Long txnID) {
108     try {
109       readWriteLock.readLock().lock();
110       return resolved.get(txnID);
111     } finally {
112       readWriteLock.readLock().unlock();
113     }
114   }
115 
116   private Long getTxnStatus(BackOffer bo, Long txnID, ByteString primary) {
117     Long status = getResolved(txnID);
118 
119     if (status != null) {
120       return status;
121     }
122 
123     while (true) {
124       // refresh region
125       region = regionManager.getRegionByKey(primary);
126 
127       Supplier<CleanupRequest> factory =
128           () ->
129               CleanupRequest.newBuilder()
130                   .setContext(region.getLeaderContext())
131                   .setKey(primary)
132                   .setStartVersion(txnID)
133                   .build();
134       KVErrorHandler<CleanupResponse> handler =
135           new KVErrorHandler<>(
136               regionManager,
137               this,
138               this,
139               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
140               resp -> resp.hasError() ? resp.getError() : null,
141               resolveLockResult -> null,
142               0L,
143               false);
144       CleanupResponse resp = callWithRetry(bo, TikvGrpc.getKvCleanupMethod(), factory, handler);
145 
146       status = 0L;
147 
148       if (resp == null) {
149         logger.error("getKvCleanupMethod failed without a cause");
150         regionManager.onRequestFail(region);
151         bo.doBackOff(
152             BoRegionMiss,
153             new TiClientInternalException("getKvCleanupMethod failed without a cause"));
154         continue;
155       }
156 
157       if (resp.hasRegionError()) {
158         bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
159         continue;
160       }
161 
162       if (resp.hasError()) {
163         logger.error(String.format("unexpected cleanup err: %s, tid: %d", resp.getError(), txnID));
164         throw new KeyException(resp.getError());
165       }
166 
167       if (resp.getCommitVersion() != 0) {
168         status = resp.getCommitVersion();
169       }
170 
171       saveResolved(txnID, status);
172       return status;
173     }
174   }
175 
176   @Override
177   public String getVersion() {
178     return "V2";
179   }
180 
181   @Override
182   public ResolveLockResult resolveLocks(
183       BackOffer bo, long callerStartTS, List<Lock> locks, boolean forWrite) {
184     if (doResolveLocks(bo, locks)) {
185       return new ResolveLockResult(0L);
186     } else {
187       return new ResolveLockResult(10000L);
188     }
189   }
190 
191   private boolean doResolveLocks(BackOffer bo, List<Lock> locks) {
192     if (locks.isEmpty()) {
193       return true;
194     }
195 
196     List<Lock> expiredLocks = new ArrayList<>();
197     for (Lock lock : locks) {
198       if (TsoUtils.isExpired(lock.getTxnID(), lock.getTtl())) {
199         expiredLocks.add(lock);
200       }
201     }
202 
203     if (expiredLocks.isEmpty()) {
204       return false;
205     }
206 
207     // TxnID -> []Region, record resolved Regions.
208     // TODO: Maybe put it in all LockResolverClientV2 and share by all txns.
209     Map<Long, Set<RegionVerID>> cleanTxns = new HashMap<>();
210     for (Lock l : expiredLocks) {
211       Long status = getTxnStatus(bo, l.getTxnID(), l.getPrimary());
212 
213       Set<RegionVerID> cleanRegion = cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());
214 
215       resolveLock(bo, l, status, cleanRegion);
216     }
217 
218     return expiredLocks.size() == locks.size();
219   }
220 
221   private void resolveLock(BackOffer bo, Lock lock, long txnStatus, Set<RegionVerID> cleanRegion) {
222 
223     while (true) {
224       region = regionManager.getRegionByKey(lock.getKey());
225 
226       if (cleanRegion.contains(region.getVerID())) {
227         return;
228       }
229 
230       Supplier<ResolveLockRequest> factory;
231 
232       if (txnStatus > 0) {
233         // txn is committed with commitTS txnStatus
234         factory =
235             () ->
236                 ResolveLockRequest.newBuilder()
237                     .setContext(region.getLeaderContext())
238                     .setStartVersion(lock.getTxnID())
239                     .setCommitVersion(txnStatus)
240                     .build();
241       } else {
242         factory =
243             () ->
244                 ResolveLockRequest.newBuilder()
245                     .setContext(region.getLeaderContext())
246                     .setStartVersion(lock.getTxnID())
247                     .build();
248       }
249 
250       KVErrorHandler<ResolveLockResponse> handler =
251           new KVErrorHandler<>(
252               regionManager,
253               this,
254               this,
255               resp -> resp.hasRegionError() ? resp.getRegionError() : null,
256               resp -> resp.hasError() ? resp.getError() : null,
257               resolveLockResult -> null,
258               0L,
259               false);
260       ResolveLockResponse resp =
261           callWithRetry(bo, TikvGrpc.getKvResolveLockMethod(), factory, handler);
262 
263       if (resp == null) {
264         logger.error("getKvResolveLockMethod failed without a cause");
265         regionManager.onRequestFail(region);
266         bo.doBackOff(
267             BoRegionMiss,
268             new TiClientInternalException("getKvResolveLockMethod failed without a cause"));
269         continue;
270       }
271 
272       if (resp.hasRegionError()) {
273         bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
274         continue;
275       }
276 
277       if (resp.hasError()) {
278         logger.error(
279             String.format("unexpected resolveLock err: %s, lock: %s", resp.getError(), lock));
280         throw new KeyException(resp.getError());
281       }
282 
283       cleanRegion.add(region.getVerID());
284       return;
285     }
286   }
287 }