View Javadoc
1   /*
2    * Copyright 2019 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.txn;
19  
20  import com.google.common.collect.Lists;
21  import com.google.protobuf.ByteString;
22  import io.grpc.StatusRuntimeException;
23  import java.util.Arrays;
24  import java.util.List;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  import org.tikv.common.ReadOnlyPDClient;
28  import org.tikv.common.TiConfiguration;
29  import org.tikv.common.exception.GrpcException;
30  import org.tikv.common.exception.KeyException;
31  import org.tikv.common.exception.RegionException;
32  import org.tikv.common.exception.TiClientInternalException;
33  import org.tikv.common.exception.TiKVException;
34  import org.tikv.common.meta.TiTimestamp;
35  import org.tikv.common.region.RegionManager;
36  import org.tikv.common.region.RegionStoreClient;
37  import org.tikv.common.region.TiRegion;
38  import org.tikv.common.region.TiStore;
39  import org.tikv.common.util.BackOffFunction;
40  import org.tikv.common.util.BackOffer;
41  import org.tikv.common.util.ConcreteBackOffer;
42  import org.tikv.kvproto.Kvrpcpb;
43  import org.tikv.txn.type.ClientRPCResult;
44  
45  /** KV client of transaction APIs for GET/PUT/DELETE/SCAN */
46  public class TxnKVClient implements AutoCloseable {
47    private static final Logger LOG = LoggerFactory.getLogger(TxnKVClient.class);
48  
49    private final RegionStoreClient.RegionStoreClientBuilder clientBuilder;
50    private final TiConfiguration conf;
51    private final RegionManager regionManager;
52    private final ReadOnlyPDClient pdClient;
53  
54    public TxnKVClient(
55        TiConfiguration conf,
56        RegionStoreClient.RegionStoreClientBuilder clientBuilder,
57        ReadOnlyPDClient pdClient) {
58      this.conf = conf;
59      this.clientBuilder = clientBuilder;
60      this.regionManager = clientBuilder.getRegionManager();
61      this.pdClient = pdClient;
62    }
63  
64    public TiConfiguration getConf() {
65      return conf;
66    }
67  
68    public RegionManager getRegionManager() {
69      return regionManager;
70    }
71  
72    public TiTimestamp getTimestamp() {
73      BackOffer bo = ConcreteBackOffer.newTsoBackOff(pdClient.getClusterId());
74      TiTimestamp timestamp = new TiTimestamp(0, 0);
75      try {
76        while (true) {
77          try {
78            timestamp = pdClient.getTimestamp(bo);
79            break;
80          } catch (final TiKVException e) {
81            // retry is exhausted
82            bo.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
83          }
84        }
85      } catch (GrpcException e1) {
86        LOG.error("Get tso from pd failed,", e1);
87      }
88      return timestamp;
89    }
90  
91    /** when encountered region error,ErrBodyMissing, and other errors */
92    public ClientRPCResult prewrite(
93        BackOffer backOffer,
94        List<Kvrpcpb.Mutation> mutations,
95        ByteString primary,
96        long lockTTL,
97        long startTs,
98        TiRegion tiRegion,
99        TiStore store) {
100     ClientRPCResult result = new ClientRPCResult(true, false, null);
101     // send request
102     RegionStoreClient client = clientBuilder.build(tiRegion, store);
103     try {
104       client.prewrite(backOffer, primary, mutations, startTs, lockTTL);
105     } catch (Exception e) {
106       result.setSuccess(false);
107       // mark retryable, region error, should retry prewrite again
108       result.setRetry(retryableException(e));
109       result.setException(e);
110     }
111     return result;
112   }
113 
114   /** TXN Heart Beat: update primary key ttl */
115   public ClientRPCResult txnHeartBeat(
116       BackOffer backOffer,
117       ByteString primaryLock,
118       long startTs,
119       long ttl,
120       TiRegion tiRegion,
121       TiStore store) {
122     ClientRPCResult result = new ClientRPCResult(true, false, null);
123     // send request
124     RegionStoreClient client = clientBuilder.build(tiRegion, store);
125     try {
126       client.txnHeartBeat(backOffer, primaryLock, startTs, ttl);
127     } catch (Exception e) {
128       result.setSuccess(false);
129       // mark retryable, region error, should retry heart beat again
130       result.setRetry(retryableException(e));
131       result.setException(e);
132     }
133     return result;
134   }
135 
136   /**
137    * Commit request of 2pc, add backoff logic when encountered region error, ErrBodyMissing, and
138    * other errors
139    *
140    * @param backOffer
141    * @param keys
142    * @param startTs
143    * @param commitTs
144    * @param tiRegion
145    * @return
146    */
147   public ClientRPCResult commit(
148       BackOffer backOffer,
149       ByteString[] keys,
150       long startTs,
151       long commitTs,
152       TiRegion tiRegion,
153       TiStore store) {
154     ClientRPCResult result = new ClientRPCResult(true, false, null);
155     // send request
156     RegionStoreClient client = clientBuilder.build(tiRegion, store);
157     List<ByteString> byteList = Lists.newArrayList();
158     byteList.addAll(Arrays.asList(keys));
159     try {
160       client.commit(backOffer, byteList, startTs, commitTs);
161     } catch (Exception e) {
162       result.setSuccess(false);
163       // mark retryable, region error, should retry prewrite again
164       result.setRetry(retryableException(e));
165       result.setException(e);
166     }
167     return result;
168   }
169 
170   // According to TiDB's implementation, when it comes to rpc error
171   // commit status remains undecided.
172   // If we fail to receive response for the request that commits primary key, it will be
173   // undetermined whether this
174   // transaction has been successfully committed.
175   // Under this circumstance,  we can not declare the commit is complete (may lead to data lost),
176   // nor can we throw
177   // an error (may lead to the duplicated key error when upper level restarts the transaction).
178   // Currently the best
179   // solution is to populate this error and let upper layer drop the connection to the corresponding
180   // mysql client.
181   // TODO: check this logic to see are we satisfied?
182   private boolean retryableException(Exception e) {
183     return e instanceof TiClientInternalException
184         || e instanceof KeyException
185         || e instanceof RegionException
186         || e instanceof StatusRuntimeException;
187   }
188 
189   @Override
190   public void close() throws Exception {}
191 }