1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.txn;
19
20 import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
21
22 import com.google.protobuf.ByteString;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.locks.ReadWriteLock;
32 import java.util.concurrent.locks.ReentrantReadWriteLock;
33 import java.util.function.Supplier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.tikv.common.TiConfiguration;
37 import org.tikv.common.exception.KeyException;
38 import org.tikv.common.exception.RegionException;
39 import org.tikv.common.exception.TiClientInternalException;
40 import org.tikv.common.operation.KVErrorHandler;
41 import org.tikv.common.region.AbstractRegionStoreClient;
42 import org.tikv.common.region.RegionManager;
43 import org.tikv.common.region.TiRegion;
44 import org.tikv.common.region.TiRegion.RegionVerID;
45 import org.tikv.common.region.TiStore;
46 import org.tikv.common.util.BackOffer;
47 import org.tikv.common.util.ChannelFactory;
48 import org.tikv.common.util.TsoUtils;
49 import org.tikv.kvproto.Kvrpcpb.CleanupRequest;
50 import org.tikv.kvproto.Kvrpcpb.CleanupResponse;
51 import org.tikv.kvproto.Kvrpcpb.ResolveLockRequest;
52 import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse;
53 import org.tikv.kvproto.TikvGrpc;
54 import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
55 import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
56
57
58 public class LockResolverClientV2 extends AbstractRegionStoreClient
59 implements AbstractLockResolverClient {
60 private static final Logger logger = LoggerFactory.getLogger(LockResolverClientV2.class);
61
62 private final ReadWriteLock readWriteLock;
63
64
65
66
67
68
69
70 private final Map<Long, Long> resolved;
71
72
73 private final Queue<Long> recentResolved;
74
75 public LockResolverClientV2(
76 TiConfiguration conf,
77 TiRegion region,
78 TiStore store,
79 TikvBlockingStub blockingStub,
80 TikvFutureStub asyncStub,
81 ChannelFactory channelFactory,
82 RegionManager regionManager) {
83 super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
84 resolved = new HashMap<>();
85 recentResolved = new LinkedList<>();
86 readWriteLock = new ReentrantReadWriteLock();
87 }
88
89 private void saveResolved(long txnID, long status) {
90 try {
91 readWriteLock.writeLock().lock();
92 if (resolved.containsKey(txnID)) {
93 return;
94 }
95
96 resolved.put(txnID, status);
97 recentResolved.add(txnID);
98 if (recentResolved.size() > RESOLVED_TXN_CACHE_SIZE) {
99 Long front = recentResolved.remove();
100 resolved.remove(front);
101 }
102 } finally {
103 readWriteLock.writeLock().unlock();
104 }
105 }
106
107 private Long getResolved(Long txnID) {
108 try {
109 readWriteLock.readLock().lock();
110 return resolved.get(txnID);
111 } finally {
112 readWriteLock.readLock().unlock();
113 }
114 }
115
116 private Long getTxnStatus(BackOffer bo, Long txnID, ByteString primary) {
117 Long status = getResolved(txnID);
118
119 if (status != null) {
120 return status;
121 }
122
123 while (true) {
124
125 region = regionManager.getRegionByKey(primary);
126
127 Supplier<CleanupRequest> factory =
128 () ->
129 CleanupRequest.newBuilder()
130 .setContext(region.getLeaderContext())
131 .setKey(primary)
132 .setStartVersion(txnID)
133 .build();
134 KVErrorHandler<CleanupResponse> handler =
135 new KVErrorHandler<>(
136 regionManager,
137 this,
138 this,
139 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
140 resp -> resp.hasError() ? resp.getError() : null,
141 resolveLockResult -> null,
142 0L,
143 false);
144 CleanupResponse resp = callWithRetry(bo, TikvGrpc.getKvCleanupMethod(), factory, handler);
145
146 status = 0L;
147
148 if (resp == null) {
149 logger.error("getKvCleanupMethod failed without a cause");
150 regionManager.onRequestFail(region);
151 bo.doBackOff(
152 BoRegionMiss,
153 new TiClientInternalException("getKvCleanupMethod failed without a cause"));
154 continue;
155 }
156
157 if (resp.hasRegionError()) {
158 bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
159 continue;
160 }
161
162 if (resp.hasError()) {
163 logger.error(String.format("unexpected cleanup err: %s, tid: %d", resp.getError(), txnID));
164 throw new KeyException(resp.getError());
165 }
166
167 if (resp.getCommitVersion() != 0) {
168 status = resp.getCommitVersion();
169 }
170
171 saveResolved(txnID, status);
172 return status;
173 }
174 }
175
176 @Override
177 public String getVersion() {
178 return "V2";
179 }
180
181 @Override
182 public ResolveLockResult resolveLocks(
183 BackOffer bo, long callerStartTS, List<Lock> locks, boolean forWrite) {
184 if (doResolveLocks(bo, locks)) {
185 return new ResolveLockResult(0L);
186 } else {
187 return new ResolveLockResult(10000L);
188 }
189 }
190
191 private boolean doResolveLocks(BackOffer bo, List<Lock> locks) {
192 if (locks.isEmpty()) {
193 return true;
194 }
195
196 List<Lock> expiredLocks = new ArrayList<>();
197 for (Lock lock : locks) {
198 if (TsoUtils.isExpired(lock.getTxnID(), lock.getTtl())) {
199 expiredLocks.add(lock);
200 }
201 }
202
203 if (expiredLocks.isEmpty()) {
204 return false;
205 }
206
207
208
209 Map<Long, Set<RegionVerID>> cleanTxns = new HashMap<>();
210 for (Lock l : expiredLocks) {
211 Long status = getTxnStatus(bo, l.getTxnID(), l.getPrimary());
212
213 Set<RegionVerID> cleanRegion = cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());
214
215 resolveLock(bo, l, status, cleanRegion);
216 }
217
218 return expiredLocks.size() == locks.size();
219 }
220
221 private void resolveLock(BackOffer bo, Lock lock, long txnStatus, Set<RegionVerID> cleanRegion) {
222
223 while (true) {
224 region = regionManager.getRegionByKey(lock.getKey());
225
226 if (cleanRegion.contains(region.getVerID())) {
227 return;
228 }
229
230 Supplier<ResolveLockRequest> factory;
231
232 if (txnStatus > 0) {
233
234 factory =
235 () ->
236 ResolveLockRequest.newBuilder()
237 .setContext(region.getLeaderContext())
238 .setStartVersion(lock.getTxnID())
239 .setCommitVersion(txnStatus)
240 .build();
241 } else {
242 factory =
243 () ->
244 ResolveLockRequest.newBuilder()
245 .setContext(region.getLeaderContext())
246 .setStartVersion(lock.getTxnID())
247 .build();
248 }
249
250 KVErrorHandler<ResolveLockResponse> handler =
251 new KVErrorHandler<>(
252 regionManager,
253 this,
254 this,
255 resp -> resp.hasRegionError() ? resp.getRegionError() : null,
256 resp -> resp.hasError() ? resp.getError() : null,
257 resolveLockResult -> null,
258 0L,
259 false);
260 ResolveLockResponse resp =
261 callWithRetry(bo, TikvGrpc.getKvResolveLockMethod(), factory, handler);
262
263 if (resp == null) {
264 logger.error("getKvResolveLockMethod failed without a cause");
265 regionManager.onRequestFail(region);
266 bo.doBackOff(
267 BoRegionMiss,
268 new TiClientInternalException("getKvResolveLockMethod failed without a cause"));
269 continue;
270 }
271
272 if (resp.hasRegionError()) {
273 bo.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
274 continue;
275 }
276
277 if (resp.hasError()) {
278 logger.error(
279 String.format("unexpected resolveLock err: %s, lock: %s", resp.getError(), lock));
280 throw new KeyException(resp.getError());
281 }
282
283 cleanRegion.add(region.getVerID());
284 return;
285 }
286 }
287 }