View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.importer;
19  
20  import io.grpc.ManagedChannel;
21  import io.grpc.stub.StreamObserver;
22  import java.util.List;
23  import java.util.Objects;
24  import java.util.concurrent.TimeUnit;
25  import java.util.function.Supplier;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  import org.tikv.common.AbstractGRPCClient;
29  import org.tikv.common.PDClient;
30  import org.tikv.common.TiConfiguration;
31  import org.tikv.common.exception.GrpcException;
32  import org.tikv.common.exception.RegionException;
33  import org.tikv.common.operation.NoopHandler;
34  import org.tikv.common.region.RegionManager;
35  import org.tikv.common.region.TiStore;
36  import org.tikv.common.util.BackOffer;
37  import org.tikv.common.util.ChannelFactory;
38  import org.tikv.common.util.ConcreteBackOffer;
39  import org.tikv.kvproto.ImportSSTGrpc;
40  import org.tikv.kvproto.ImportSstpb;
41  import org.tikv.kvproto.Kvrpcpb;
42  
43  public class ImporterStoreClient<RequestClass, ResponseClass>
44      extends AbstractGRPCClient<
45          ImportSSTGrpc.ImportSSTBlockingStub, ImportSSTGrpc.ImportSSTFutureStub>
46      implements StreamObserver<ResponseClass> {
47  
48    private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class);
49  
50    private final ImportSSTGrpc.ImportSSTStub stub;
51  
52    protected ImporterStoreClient(
53        TiConfiguration conf,
54        ChannelFactory channelFactory,
55        ImportSSTGrpc.ImportSSTBlockingStub blockingStub,
56        ImportSSTGrpc.ImportSSTFutureStub asyncStub,
57        ImportSSTGrpc.ImportSSTStub stub) {
58      super(conf, channelFactory, blockingStub, asyncStub);
59      this.stub = stub;
60    }
61  
62    private StreamObserver<RequestClass> streamObserverRequest;
63    private ResponseClass writeResponse;
64    private Throwable writeError;
65  
66    public synchronized boolean isWriteResponseReceived() {
67      return writeResponse != null;
68    }
69  
70    public synchronized ResponseClass getWriteResponse() {
71      return writeResponse;
72    }
73  
74    private synchronized void setWriteResponse(ResponseClass writeResponse) {
75      this.writeResponse = writeResponse;
76    }
77  
78    public synchronized boolean hasWriteResponseError() {
79      return this.writeError != null;
80    }
81  
82    public synchronized Throwable getWriteError() {
83      return this.writeError;
84    }
85  
86    private synchronized void setWriteError(Throwable t) {
87      this.writeError = t;
88    }
89  
90    @Override
91    public void onNext(ResponseClass response) {
92      setWriteResponse(response);
93    }
94  
95    @Override
96    public void onError(Throwable t) {
97      setWriteError(t);
98      logger.error("Error during write!", t);
99    }
100 
101   @Override
102   public void onCompleted() {
103     // do nothing
104   }
105 
106   /**
107    * Ingest KV pairs to RawKV/Txn using gRPC streaming mode. This API should be called on both
108    * leader and followers.
109    */
110   public void startWrite() {
111     if (conf.isRawKVMode()) {
112       streamObserverRequest =
113           (StreamObserver<RequestClass>)
114               getStub().rawWrite((StreamObserver<ImportSstpb.RawWriteResponse>) this);
115     } else {
116       streamObserverRequest =
117           (StreamObserver<RequestClass>)
118               getStub().write((StreamObserver<ImportSstpb.WriteResponse>) this);
119     }
120   }
121 
122   /**
123    * This API should be called after `startWrite`.
124    *
125    * @param request
126    */
127   public void writeBatch(RequestClass request) {
128     streamObserverRequest.onNext(request);
129   }
130 
131   /** This API should be called after `writeBatch`. */
132   public void finishWrite() {
133     streamObserverRequest.onCompleted();
134   }
135 
136   /**
137    * This API should be called after `finishWrite`. This API should be called on leader only.
138    *
139    * @param ctx
140    * @param writeResponse
141    * @throws RegionException
142    */
143   public void multiIngest(Kvrpcpb.Context ctx, Object writeResponse) throws RegionException {
144     List<ImportSstpb.SSTMeta> metasList;
145     if (writeResponse instanceof ImportSstpb.RawWriteResponse) {
146       metasList = ((ImportSstpb.RawWriteResponse) writeResponse).getMetasList();
147     } else if (writeResponse instanceof ImportSstpb.WriteResponse) {
148       metasList = ((ImportSstpb.WriteResponse) writeResponse).getMetasList();
149     } else {
150       throw new IllegalArgumentException("Wrong response type: " + writeResponse);
151     }
152 
153     ImportSstpb.MultiIngestRequest request =
154         ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build();
155 
156     ImportSstpb.IngestResponse response = getBlockingStub().multiIngest(request);
157     if (response.hasError()) {
158       throw new RegionException(response.getError());
159     }
160   }
161 
162   public void switchMode(ImportSstpb.SwitchMode mode) {
163     Supplier<ImportSstpb.SwitchModeRequest> request =
164         () -> ImportSstpb.SwitchModeRequest.newBuilder().setMode(mode).build();
165     NoopHandler<ImportSstpb.SwitchModeResponse> noopHandler = new NoopHandler<>();
166 
167     callWithRetry(
168         ConcreteBackOffer.newCustomBackOff(BackOffer.TIKV_SWITCH_MODE_BACKOFF),
169         ImportSSTGrpc.getSwitchModeMethod(),
170         request,
171         noopHandler);
172   }
173 
174   @Override
175   protected ImportSSTGrpc.ImportSSTBlockingStub getBlockingStub() {
176     return blockingStub.withDeadlineAfter(conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
177   }
178 
179   @Override
180   protected ImportSSTGrpc.ImportSSTFutureStub getAsyncStub() {
181     return asyncStub.withDeadlineAfter(conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
182   }
183 
184   protected ImportSSTGrpc.ImportSSTStub getStub() {
185     return stub.withDeadlineAfter(conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
186   }
187 
188   @Override
189   public void close() throws Exception {}
190 
191   public static class ImporterStoreClientBuilder<RequestClass, ResponseClass> {
192     private final TiConfiguration conf;
193     private final ChannelFactory channelFactory;
194     private final RegionManager regionManager;
195     private final PDClient pdClient;
196 
197     public ImporterStoreClientBuilder(
198         TiConfiguration conf,
199         ChannelFactory channelFactory,
200         RegionManager regionManager,
201         PDClient pdClient) {
202       Objects.requireNonNull(conf, "conf is null");
203       Objects.requireNonNull(channelFactory, "channelFactory is null");
204       Objects.requireNonNull(regionManager, "regionManager is null");
205       this.conf = conf;
206       this.channelFactory = channelFactory;
207       this.regionManager = regionManager;
208       this.pdClient = pdClient;
209     }
210 
211     public synchronized ImporterStoreClient build(TiStore store) throws GrpcException {
212       Objects.requireNonNull(store, "store is null");
213 
214       String addressStr = store.getStore().getAddress();
215       logger.debug(String.format("Create region store client on address %s", addressStr));
216 
217       ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
218       ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel);
219       ImportSSTGrpc.ImportSSTFutureStub asyncStub = ImportSSTGrpc.newFutureStub(channel);
220       ImportSSTGrpc.ImportSSTStub stub = ImportSSTGrpc.newStub(channel);
221 
222       return new ImporterStoreClient<RequestClass, ResponseClass>(
223           conf, channelFactory, blockingStub, asyncStub, stub);
224     }
225   }
226 }