1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.txn;
19
20 import com.google.common.collect.Lists;
21 import com.google.protobuf.ByteString;
22 import io.grpc.StatusRuntimeException;
23 import java.util.Arrays;
24 import java.util.List;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.tikv.common.ReadOnlyPDClient;
28 import org.tikv.common.TiConfiguration;
29 import org.tikv.common.exception.GrpcException;
30 import org.tikv.common.exception.KeyException;
31 import org.tikv.common.exception.RegionException;
32 import org.tikv.common.exception.TiClientInternalException;
33 import org.tikv.common.exception.TiKVException;
34 import org.tikv.common.meta.TiTimestamp;
35 import org.tikv.common.region.RegionManager;
36 import org.tikv.common.region.RegionStoreClient;
37 import org.tikv.common.region.TiRegion;
38 import org.tikv.common.region.TiStore;
39 import org.tikv.common.util.BackOffFunction;
40 import org.tikv.common.util.BackOffer;
41 import org.tikv.common.util.ConcreteBackOffer;
42 import org.tikv.kvproto.Kvrpcpb;
43 import org.tikv.txn.type.ClientRPCResult;
44
45
46 public class TxnKVClient implements AutoCloseable {
47 private static final Logger LOG = LoggerFactory.getLogger(TxnKVClient.class);
48
49 private final RegionStoreClient.RegionStoreClientBuilder clientBuilder;
50 private final TiConfiguration conf;
51 private final RegionManager regionManager;
52 private final ReadOnlyPDClient pdClient;
53
54 public TxnKVClient(
55 TiConfiguration conf,
56 RegionStoreClient.RegionStoreClientBuilder clientBuilder,
57 ReadOnlyPDClient pdClient) {
58 this.conf = conf;
59 this.clientBuilder = clientBuilder;
60 this.regionManager = clientBuilder.getRegionManager();
61 this.pdClient = pdClient;
62 }
63
64 public TiConfiguration getConf() {
65 return conf;
66 }
67
68 public RegionManager getRegionManager() {
69 return regionManager;
70 }
71
72 public TiTimestamp getTimestamp() {
73 BackOffer bo = ConcreteBackOffer.newTsoBackOff(pdClient.getClusterId());
74 TiTimestamp timestamp = new TiTimestamp(0, 0);
75 try {
76 while (true) {
77 try {
78 timestamp = pdClient.getTimestamp(bo);
79 break;
80 } catch (final TiKVException e) {
81
82 bo.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
83 }
84 }
85 } catch (GrpcException e1) {
86 LOG.error("Get tso from pd failed,", e1);
87 }
88 return timestamp;
89 }
90
91
92 public ClientRPCResult prewrite(
93 BackOffer backOffer,
94 List<Kvrpcpb.Mutation> mutations,
95 ByteString primary,
96 long lockTTL,
97 long startTs,
98 TiRegion tiRegion,
99 TiStore store) {
100 ClientRPCResult result = new ClientRPCResult(true, false, null);
101
102 RegionStoreClient client = clientBuilder.build(tiRegion, store);
103 try {
104 client.prewrite(backOffer, primary, mutations, startTs, lockTTL);
105 } catch (Exception e) {
106 result.setSuccess(false);
107
108 result.setRetry(retryableException(e));
109 result.setException(e);
110 }
111 return result;
112 }
113
114
115 public ClientRPCResult txnHeartBeat(
116 BackOffer backOffer,
117 ByteString primaryLock,
118 long startTs,
119 long ttl,
120 TiRegion tiRegion,
121 TiStore store) {
122 ClientRPCResult result = new ClientRPCResult(true, false, null);
123
124 RegionStoreClient client = clientBuilder.build(tiRegion, store);
125 try {
126 client.txnHeartBeat(backOffer, primaryLock, startTs, ttl);
127 } catch (Exception e) {
128 result.setSuccess(false);
129
130 result.setRetry(retryableException(e));
131 result.setException(e);
132 }
133 return result;
134 }
135
136
137
138
139
140
141
142
143
144
145
146
147 public ClientRPCResult commit(
148 BackOffer backOffer,
149 ByteString[] keys,
150 long startTs,
151 long commitTs,
152 TiRegion tiRegion,
153 TiStore store) {
154 ClientRPCResult result = new ClientRPCResult(true, false, null);
155
156 RegionStoreClient client = clientBuilder.build(tiRegion, store);
157 List<ByteString> byteList = Lists.newArrayList();
158 byteList.addAll(Arrays.asList(keys));
159 try {
160 client.commit(backOffer, byteList, startTs, commitTs);
161 } catch (Exception e) {
162 result.setSuccess(false);
163
164 result.setRetry(retryableException(e));
165 result.setException(e);
166 }
167 return result;
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181
182 private boolean retryableException(Exception e) {
183 return e instanceof TiClientInternalException
184 || e instanceof KeyException
185 || e instanceof RegionException
186 || e instanceof StatusRuntimeException;
187 }
188
189 @Override
190 public void close() throws Exception {}
191 }