1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.importer;
19
20 import io.grpc.ManagedChannel;
21 import io.grpc.stub.StreamObserver;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.concurrent.TimeUnit;
25 import java.util.function.Supplier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.tikv.common.AbstractGRPCClient;
29 import org.tikv.common.PDClient;
30 import org.tikv.common.TiConfiguration;
31 import org.tikv.common.exception.GrpcException;
32 import org.tikv.common.exception.RegionException;
33 import org.tikv.common.operation.NoopHandler;
34 import org.tikv.common.region.RegionManager;
35 import org.tikv.common.region.TiStore;
36 import org.tikv.common.util.BackOffer;
37 import org.tikv.common.util.ChannelFactory;
38 import org.tikv.common.util.ConcreteBackOffer;
39 import org.tikv.kvproto.ImportSSTGrpc;
40 import org.tikv.kvproto.ImportSstpb;
41 import org.tikv.kvproto.Kvrpcpb;
42
43 public class ImporterStoreClient<RequestClass, ResponseClass>
44 extends AbstractGRPCClient<
45 ImportSSTGrpc.ImportSSTBlockingStub, ImportSSTGrpc.ImportSSTFutureStub>
46 implements StreamObserver<ResponseClass> {
47
48 private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class);
49
50 private final ImportSSTGrpc.ImportSSTStub stub;
51
52 protected ImporterStoreClient(
53 TiConfiguration conf,
54 ChannelFactory channelFactory,
55 ImportSSTGrpc.ImportSSTBlockingStub blockingStub,
56 ImportSSTGrpc.ImportSSTFutureStub asyncStub,
57 ImportSSTGrpc.ImportSSTStub stub) {
58 super(conf, channelFactory, blockingStub, asyncStub);
59 this.stub = stub;
60 }
61
62 private StreamObserver<RequestClass> streamObserverRequest;
63 private ResponseClass writeResponse;
64 private Throwable writeError;
65
66 public synchronized boolean isWriteResponseReceived() {
67 return writeResponse != null;
68 }
69
70 public synchronized ResponseClass getWriteResponse() {
71 return writeResponse;
72 }
73
74 private synchronized void setWriteResponse(ResponseClass writeResponse) {
75 this.writeResponse = writeResponse;
76 }
77
78 public synchronized boolean hasWriteResponseError() {
79 return this.writeError != null;
80 }
81
82 public synchronized Throwable getWriteError() {
83 return this.writeError;
84 }
85
86 private synchronized void setWriteError(Throwable t) {
87 this.writeError = t;
88 }
89
90 @Override
91 public void onNext(ResponseClass response) {
92 setWriteResponse(response);
93 }
94
95 @Override
96 public void onError(Throwable t) {
97 setWriteError(t);
98 logger.error("Error during write!", t);
99 }
100
101 @Override
102 public void onCompleted() {
103
104 }
105
106
107
108
109
110 public void startWrite() {
111 if (conf.isRawKVMode()) {
112 streamObserverRequest =
113 (StreamObserver<RequestClass>)
114 getStub().rawWrite((StreamObserver<ImportSstpb.RawWriteResponse>) this);
115 } else {
116 streamObserverRequest =
117 (StreamObserver<RequestClass>)
118 getStub().write((StreamObserver<ImportSstpb.WriteResponse>) this);
119 }
120 }
121
122
123
124
125
126
127 public void writeBatch(RequestClass request) {
128 streamObserverRequest.onNext(request);
129 }
130
131
132 public void finishWrite() {
133 streamObserverRequest.onCompleted();
134 }
135
136
137
138
139
140
141
142
143 public void multiIngest(Kvrpcpb.Context ctx, Object writeResponse) throws RegionException {
144 List<ImportSstpb.SSTMeta> metasList;
145 if (writeResponse instanceof ImportSstpb.RawWriteResponse) {
146 metasList = ((ImportSstpb.RawWriteResponse) writeResponse).getMetasList();
147 } else if (writeResponse instanceof ImportSstpb.WriteResponse) {
148 metasList = ((ImportSstpb.WriteResponse) writeResponse).getMetasList();
149 } else {
150 throw new IllegalArgumentException("Wrong response type: " + writeResponse);
151 }
152
153 ImportSstpb.MultiIngestRequest request =
154 ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build();
155
156 ImportSstpb.IngestResponse response = getBlockingStub().multiIngest(request);
157 if (response.hasError()) {
158 throw new RegionException(response.getError());
159 }
160 }
161
162 public void switchMode(ImportSstpb.SwitchMode mode) {
163 Supplier<ImportSstpb.SwitchModeRequest> request =
164 () -> ImportSstpb.SwitchModeRequest.newBuilder().setMode(mode).build();
165 NoopHandler<ImportSstpb.SwitchModeResponse> noopHandler = new NoopHandler<>();
166
167 callWithRetry(
168 ConcreteBackOffer.newCustomBackOff(BackOffer.TIKV_SWITCH_MODE_BACKOFF),
169 ImportSSTGrpc.getSwitchModeMethod(),
170 request,
171 noopHandler);
172 }
173
174 @Override
175 protected ImportSSTGrpc.ImportSSTBlockingStub getBlockingStub() {
176 return blockingStub.withDeadlineAfter(conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
177 }
178
179 @Override
180 protected ImportSSTGrpc.ImportSSTFutureStub getAsyncStub() {
181 return asyncStub.withDeadlineAfter(conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
182 }
183
184 protected ImportSSTGrpc.ImportSSTStub getStub() {
185 return stub.withDeadlineAfter(conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
186 }
187
188 @Override
189 public void close() throws Exception {}
190
191 public static class ImporterStoreClientBuilder<RequestClass, ResponseClass> {
192 private final TiConfiguration conf;
193 private final ChannelFactory channelFactory;
194 private final RegionManager regionManager;
195 private final PDClient pdClient;
196
197 public ImporterStoreClientBuilder(
198 TiConfiguration conf,
199 ChannelFactory channelFactory,
200 RegionManager regionManager,
201 PDClient pdClient) {
202 Objects.requireNonNull(conf, "conf is null");
203 Objects.requireNonNull(channelFactory, "channelFactory is null");
204 Objects.requireNonNull(regionManager, "regionManager is null");
205 this.conf = conf;
206 this.channelFactory = channelFactory;
207 this.regionManager = regionManager;
208 this.pdClient = pdClient;
209 }
210
211 public synchronized ImporterStoreClient build(TiStore store) throws GrpcException {
212 Objects.requireNonNull(store, "store is null");
213
214 String addressStr = store.getStore().getAddress();
215 logger.debug(String.format("Create region store client on address %s", addressStr));
216
217 ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
218 ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel);
219 ImportSSTGrpc.ImportSSTFutureStub asyncStub = ImportSSTGrpc.newFutureStub(channel);
220 ImportSSTGrpc.ImportSSTStub stub = ImportSSTGrpc.newStub(channel);
221
222 return new ImporterStoreClient<RequestClass, ResponseClass>(
223 conf, channelFactory, blockingStub, asyncStub, stub);
224 }
225 }
226 }