1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.meta;
19
20 import static com.google.common.base.Preconditions.checkArgument;
21 import static com.google.common.base.Preconditions.checkNotNull;
22 import static java.util.Objects.requireNonNull;
23 import static org.tikv.common.predicates.PredicateUtils.mergeCNFExpressions;
24
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Joiner;
27 import com.google.common.collect.ImmutableList;
28 import com.google.common.collect.ImmutableMap;
29 import com.pingcap.tidb.tipb.*;
30 import java.io.*;
31 import java.util.*;
32 import java.util.stream.Collectors;
33 import javax.annotation.Nonnull;
34 import org.tikv.common.codec.KeyUtils;
35 import org.tikv.common.exception.DAGRequestException;
36 import org.tikv.common.exception.TiClientInternalException;
37 import org.tikv.common.expression.AggregateFunction;
38 import org.tikv.common.expression.ByItem;
39 import org.tikv.common.expression.ColumnRef;
40 import org.tikv.common.expression.Expression;
41 import org.tikv.common.expression.visitor.ProtoConverter;
42 import org.tikv.common.key.RowKey;
43 import org.tikv.common.predicates.PredicateUtils;
44 import org.tikv.common.region.TiStoreType;
45 import org.tikv.common.types.DataType;
46 import org.tikv.common.types.IntegerType;
47 import org.tikv.common.util.KeyRangeUtils;
48 import org.tikv.kvproto.Coprocessor;
49
50
51
52
53
54
55 public class TiDAGRequest implements Serializable {
56
57 private static final Map<ExecType, Integer> EXEC_TYPE_PRIORITY_MAP =
58 ImmutableMap.<ExecType, Integer>builder()
59 .put(ExecType.TypeTableScan, 0)
60 .put(ExecType.TypeIndexScan, 0)
61 .put(ExecType.TypeSelection, 1)
62 .put(ExecType.TypeAggregation, 2)
63 .put(ExecType.TypeTopN, 3)
64 .put(ExecType.TypeLimit, 4)
65 .build();
66
67 private static final ColumnInfo handleColumn =
68 ColumnInfo.newBuilder()
69 .setColumnId(-1)
70 .setPkHandle(true)
71
72
73
74 .setTp(8)
75 .setColumnLen(20)
76 .setFlag(2)
77 .build();
78 private final List<ColumnRef> fields = new ArrayList<>();
79 private final List<DataType> indexDataTypes = new ArrayList<>();
80 private final List<Expression> filters = new ArrayList<>();
81 private final List<ByItem> groupByItems = new ArrayList<>();
82 private final List<ByItem> orderByItems = new ArrayList<>();
83
84
85 private final List<AggregateFunction> aggregates = new ArrayList<>();
86 private final Map<Long, List<Coprocessor.KeyRange>> idToRanges = new HashMap<>();
87
88
89
90 private final List<Expression> downgradeFilters = new ArrayList<>();
91 private final List<Expression> pushDownFilters = new ArrayList<>();
92 private final List<AggregateFunction> pushDownAggregates = new ArrayList<>();
93 private final List<ByItem> pushDownGroupBys = new ArrayList<>();
94 private final List<ByItem> pushDownOrderBys = new ArrayList<>();
95 private final PushDownType pushDownType;
96 private TiTableInfo tableInfo;
97 private List<TiPartitionDef> prunedParts;
98 private TiStoreType storeType = TiStoreType.TiKV;
99 private TiIndexInfo indexInfo;
100 private List<Long> prunedPhysicalIds = new ArrayList<>();
101 private final Map<Long, String> prunedPartNames = new HashMap<>();
102 private long physicalId;
103 private int pushDownLimits;
104 private int limit;
105 private int timeZoneOffset;
106 private long flags;
107 private TiTimestamp startTs;
108 private Expression having;
109 private boolean distinct;
110 private boolean isDoubleRead;
111 private EncodeType encodeType;
112 private double estimatedCount = -1;
113
114 public TiDAGRequest(PushDownType pushDownType) {
115 this.pushDownType = pushDownType;
116 this.encodeType = EncodeType.TypeDefault;
117 }
118
119 private TiDAGRequest(PushDownType pushDownType, EncodeType encodeType) {
120 this.pushDownType = pushDownType;
121 this.encodeType = encodeType;
122 }
123
124 public TiDAGRequest(PushDownType pushDownType, EncodeType encodeType, int timeZoneOffset) {
125 this(pushDownType, encodeType);
126 this.timeZoneOffset = timeZoneOffset;
127 }
128
129 public TiDAGRequest(PushDownType pushDownType, int timeZoneOffset) {
130 this(pushDownType, EncodeType.TypeDefault);
131 this.timeZoneOffset = timeZoneOffset;
132 }
133
134 public List<TiPartitionDef> getPrunedParts() {
135 return prunedParts;
136 }
137
138 private String getPrunedPartName(long id) {
139 return prunedPartNames.getOrDefault(id, "unknown");
140 }
141
142 public void setPrunedParts(List<TiPartitionDef> prunedParts) {
143 this.prunedParts = prunedParts;
144 if (prunedParts != null) {
145 List<Long> ids = new ArrayList<>();
146 prunedPartNames.clear();
147 for (TiPartitionDef pDef : prunedParts) {
148 ids.add(pDef.getId());
149 prunedPartNames.put(pDef.getId(), pDef.getName());
150 }
151 this.prunedPhysicalIds = ids;
152 }
153 }
154
155 public List<Long> getPrunedPhysicalIds() {
156 if (!this.tableInfo.isPartitionEnabled()) {
157 return prunedPhysicalIds = ImmutableList.of(this.tableInfo.getId());
158 } else {
159 return prunedPhysicalIds;
160 }
161 }
162
163 public TiStoreType getStoreType() {
164 return storeType;
165 }
166
167 public void setStoreType(TiStoreType storeType) {
168 this.storeType = storeType;
169 }
170
171 public EncodeType getEncodeType() {
172 return encodeType;
173 }
174
175 public void setEncodeType(EncodeType encodeType) {
176 this.encodeType = encodeType;
177 }
178
179 public DAGRequest buildIndexScan() {
180 List<Integer> outputOffsets = new ArrayList<>();
181 DAGRequest.Builder builder = buildScan(true, outputOffsets);
182 return buildRequest(builder, outputOffsets);
183 }
184
185 public DAGRequest buildTableScan() {
186 List<Integer> outputOffsets = new ArrayList<>();
187 boolean isCoveringIndex = isCoveringIndexScan();
188 DAGRequest.Builder builder = buildScan(isCoveringIndex, outputOffsets);
189 return buildRequest(builder, outputOffsets);
190 }
191
192 private DAGRequest buildRequest(
193 DAGRequest.Builder dagRequestBuilder, List<Integer> outputOffsets) {
194 checkNotNull(startTs, "startTs is null");
195 checkArgument(startTs.getVersion() != 0, "timestamp is 0");
196 DAGRequest request =
197 dagRequestBuilder
198 .setTimeZoneOffset(timeZoneOffset)
199 .setFlags(flags)
200 .addAllOutputOffsets(outputOffsets)
201 .setEncodeType(this.encodeType)
202
203 .setStartTsFallback(startTs.getVersion())
204 .build();
205
206 validateRequest(request);
207 return request;
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221 private DAGRequest.Builder buildScan(boolean buildIndexScan, List<Integer> outputOffsets) {
222 long id = getPhysicalId();
223 checkNotNull(startTs, "startTs is null");
224 checkArgument(startTs.getVersion() != 0, "timestamp is 0");
225 clearPushDownInfo();
226 DAGRequest.Builder dagRequestBuilder = DAGRequest.newBuilder();
227 Executor.Builder executorBuilder = Executor.newBuilder();
228 IndexScan.Builder indexScanBuilder = IndexScan.newBuilder();
229 TableScan.Builder tblScanBuilder = TableScan.newBuilder();
230
231 Map<String, Integer> colOffsetInFieldMap = new HashMap<>();
232
233 Map<String, Integer> colPosInIndexMap = new HashMap<>();
234
235 if (buildIndexScan) {
236
237 if (indexInfo == null) {
238 throw new TiClientInternalException("Index is empty for index scan");
239 }
240 List<TiColumnInfo> columnInfoList = tableInfo.getColumns();
241 boolean hasPk = false;
242
243 List<Integer> indexColOffsets =
244 indexInfo
245 .getIndexColumns()
246 .stream()
247 .map(TiIndexColumn::getOffset)
248 .collect(Collectors.toList());
249
250 int idxPos = 0;
251
252 for (Integer idx : indexColOffsets) {
253 TiColumnInfo tiColumnInfo = columnInfoList.get(idx);
254 ColumnInfo columnInfo = tiColumnInfo.toProto(tableInfo);
255 colPosInIndexMap.put(tiColumnInfo.getName(), idxPos++);
256
257 ColumnInfo.Builder colBuilder = ColumnInfo.newBuilder(columnInfo);
258 if (columnInfo.getColumnId() == -1) {
259 hasPk = true;
260 colBuilder.setPkHandle(true);
261 }
262 indexScanBuilder.addColumns(colBuilder);
263 }
264
265 int colCount = indexScanBuilder.getColumnsCount();
266 if (isDoubleRead()) {
267
268
269
270
271
272 for (ColumnRef col : getFields()) {
273 Integer pos = colPosInIndexMap.get(col.getName());
274 if (pos != null) {
275 TiColumnInfo columnInfo = columnInfoList.get(indexColOffsets.get(pos));
276 if (col.matchName(columnInfo.getName())) {
277 colOffsetInFieldMap.put(col.getName(), pos);
278 }
279
280 }
281 }
282
283 if (!hasPk) {
284
285 indexScanBuilder.addColumns(handleColumn);
286 ++colCount;
287 addRequiredIndexDataType();
288 }
289
290 if (colCount == 0) {
291 throw new DAGRequestException("Incorrect index scan with zero column count");
292 }
293
294 outputOffsets.add(colCount - 1);
295 } else {
296 boolean pkIsNeeded = false;
297
298
299 for (ColumnRef col : getFields()) {
300 Integer pos = colPosInIndexMap.get(col.getName());
301 if (pos != null) {
302 TiColumnInfo columnInfo = columnInfoList.get(indexColOffsets.get(pos));
303 if (col.matchName(columnInfo.getName())) {
304 outputOffsets.add(pos);
305 colOffsetInFieldMap.put(col.getName(), pos);
306 }
307 }
308
309
310 else if (tableInfo.getColumn(col.getName()).isPrimaryKey()) {
311 pkIsNeeded = true;
312
313 outputOffsets.add(colCount);
314
315 colOffsetInFieldMap.put(col.getName(), indexColOffsets.size());
316 } else {
317 throw new DAGRequestException(
318 "columns other than primary key and index key exist in fields while index single read: "
319 + col.getName());
320 }
321 }
322
323 if (pkIsNeeded) {
324 indexScanBuilder.addColumns(handleColumn);
325 }
326 }
327 executorBuilder.setTp(ExecType.TypeIndexScan);
328
329 indexScanBuilder.setTableId(id).setIndexId(indexInfo.getId());
330 dagRequestBuilder.addExecutors(executorBuilder.setIdxScan(indexScanBuilder).build());
331 } else {
332
333 executorBuilder.setTp(ExecType.TypeTableScan);
334 tblScanBuilder.setTableId(id);
335
336 int lastOffset = 0;
337 for (ColumnRef col : getFields()) {
338
339 if (!colOffsetInFieldMap.containsKey(col.getName())) {
340 tblScanBuilder.addColumns(tableInfo.getColumn(col.getName()).toProto(tableInfo));
341 colOffsetInFieldMap.put(col.getName(), lastOffset);
342 lastOffset++;
343 }
344
345 outputOffsets.add(colOffsetInFieldMap.get(col.getName()));
346 }
347
348 dagRequestBuilder.addExecutors(executorBuilder.setTblScan(tblScanBuilder));
349 }
350
351 boolean isIndexDoubleScan = buildIndexScan && isDoubleRead();
352
353
354
355
356 executorBuilder.clear();
357
358
359
360
361
362
363 Expression whereExpr = mergeCNFExpressions(getFilters());
364 if (whereExpr != null) {
365 if (!isIndexDoubleScan || isExpressionCoveredByIndex(whereExpr)) {
366 executorBuilder.setTp(ExecType.TypeSelection);
367 dagRequestBuilder.addExecutors(
368 executorBuilder.setSelection(
369 Selection.newBuilder()
370 .addConditions(ProtoConverter.toProto(whereExpr, colOffsetInFieldMap))));
371 executorBuilder.clear();
372 addPushDownFilters();
373 } else {
374 return dagRequestBuilder;
375 }
376 }
377
378 if (!getGroupByItems().isEmpty() || !getAggregates().isEmpty()) {
379
380 if (!isIndexDoubleScan || (isGroupByCoveredByIndex() && isAggregateCoveredByIndex())) {
381 pushDownAggAndGroupBy(
382 dagRequestBuilder, executorBuilder, outputOffsets, colOffsetInFieldMap);
383 } else {
384 return dagRequestBuilder;
385 }
386 }
387
388 if (!getOrderByItems().isEmpty()) {
389 if (!isIndexDoubleScan || isOrderByCoveredByIndex()) {
390
391 pushDownOrderBy(dagRequestBuilder, executorBuilder, colOffsetInFieldMap);
392 }
393 } else if (getLimit() != 0) {
394 if (!isIndexDoubleScan) {
395 pushDownLimit(dagRequestBuilder, executorBuilder);
396 }
397 }
398
399 return dagRequestBuilder;
400 }
401
402 private void pushDownLimit(
403 DAGRequest.Builder dagRequestBuilder, Executor.Builder executorBuilder) {
404 Limit.Builder limitBuilder = Limit.newBuilder();
405 limitBuilder.setLimit(getLimit());
406 executorBuilder.setTp(ExecType.TypeLimit);
407 dagRequestBuilder.addExecutors(executorBuilder.setLimit(limitBuilder));
408 executorBuilder.clear();
409 addPushDownLimits();
410 }
411
412 private void pushDownOrderBy(
413 DAGRequest.Builder dagRequestBuilder,
414 Executor.Builder executorBuilder,
415 Map<String, Integer> colOffsetInFieldMap) {
416 TopN.Builder topNBuilder = TopN.newBuilder();
417 getOrderByItems()
418 .forEach(
419 tiByItem ->
420 topNBuilder.addOrderBy(
421 com.pingcap.tidb.tipb.ByItem.newBuilder()
422 .setExpr(ProtoConverter.toProto(tiByItem.getExpr(), colOffsetInFieldMap))
423 .setDesc(tiByItem.isDesc())));
424 executorBuilder.setTp(ExecType.TypeTopN);
425 topNBuilder.setLimit(getLimit());
426 dagRequestBuilder.addExecutors(executorBuilder.setTopN(topNBuilder));
427 executorBuilder.clear();
428 addPushDownOrderBys();
429 }
430
431 private void pushDownAggAndGroupBy(
432 DAGRequest.Builder dagRequestBuilder,
433 Executor.Builder executorBuilder,
434 List<Integer> outputOffsets,
435 Map<String, Integer> colOffsetInFieldMap) {
436 Aggregation.Builder aggregationBuilder = Aggregation.newBuilder();
437 getAggregates()
438 .forEach(
439 tiExpr ->
440 aggregationBuilder.addAggFunc(ProtoConverter.toProto(tiExpr, colOffsetInFieldMap)));
441 getGroupByItems()
442 .forEach(
443 tiByItem ->
444 aggregationBuilder.addGroupBy(
445 ProtoConverter.toProto(tiByItem.getExpr(), colOffsetInFieldMap)));
446 executorBuilder.setTp(ExecType.TypeAggregation);
447 dagRequestBuilder.addExecutors(executorBuilder.setAggregation(aggregationBuilder));
448 executorBuilder.clear();
449 addPushDownGroupBys();
450 addPushDownAggregates();
451
452
453 outputOffsets.clear();
454 for (int i = 0; i < getAggregates().size(); i++) {
455 outputOffsets.add(i);
456 }
457
458
459 int currentMaxOutputOffset = outputOffsets.get(outputOffsets.size() - 1) + 1;
460 for (int i = 0; i < getGroupByItems().size(); i++) {
461 outputOffsets.add(currentMaxOutputOffset + i);
462 }
463 }
464
465 private boolean isExpressionCoveredByIndex(Expression expr) {
466 Set<String> indexColumnRefSet =
467 indexInfo
468 .getIndexColumns()
469 .stream()
470 .filter(x -> !x.isPrefixIndex())
471 .map(TiIndexColumn::getName)
472 .collect(Collectors.toSet());
473 return !isDoubleRead()
474 && PredicateUtils.extractColumnRefFromExpression(expr)
475 .stream()
476 .map(ColumnRef::getName)
477 .allMatch(indexColumnRefSet::contains);
478 }
479
480 private boolean isGroupByCoveredByIndex() {
481 return isByItemCoveredByIndex(getGroupByItems());
482 }
483
484 private boolean isOrderByCoveredByIndex() {
485 return isByItemCoveredByIndex(getOrderByItems());
486 }
487
488 private boolean isByItemCoveredByIndex(List<ByItem> byItems) {
489 if (byItems.isEmpty()) {
490 return false;
491 }
492 return byItems.stream().allMatch(x -> isExpressionCoveredByIndex(x.getExpr()));
493 }
494
495 private boolean isAggregateCoveredByIndex() {
496 if (aggregates.isEmpty()) {
497 return false;
498 }
499 return aggregates.stream().allMatch(this::isExpressionCoveredByIndex);
500 }
501
502
503
504
505
506
507
508
509
510 private void validateRequest(DAGRequest dagRequest) {
511 requireNonNull(dagRequest);
512
513 requireNonNull(dagRequest.getEncodeType());
514
515
516 if (dagRequest.getExecutorsCount() < 1) {
517 throw new DAGRequestException("Invalid executors count:" + dagRequest.getExecutorsCount());
518 }
519
520 ExecType formerType = dagRequest.getExecutors(0).getTp();
521 if (formerType != ExecType.TypeTableScan && formerType != ExecType.TypeIndexScan) {
522 throw new DAGRequestException(
523 "Invalid first executor type:"
524 + formerType
525 + ", must one of TypeTableScan or TypeIndexScan");
526 }
527
528 for (int i = 1; i < dagRequest.getExecutorsCount(); i++) {
529 ExecType currentType = dagRequest.getExecutors(i).getTp();
530 if (EXEC_TYPE_PRIORITY_MAP.get(currentType) < EXEC_TYPE_PRIORITY_MAP.get(formerType)) {
531 throw new DAGRequestException("Invalid executor priority.");
532 }
533 formerType = currentType;
534 }
535 }
536
537 public TiTableInfo getTableInfo() {
538 return this.tableInfo;
539 }
540
541 public TiDAGRequest setTableInfo(TiTableInfo tableInfo) {
542 this.tableInfo = requireNonNull(tableInfo, "tableInfo is null");
543 setPhysicalId(tableInfo.getId());
544 return this;
545 }
546
547 public long getPhysicalId() {
548 return this.physicalId;
549 }
550
551 public TiDAGRequest setPhysicalId(long id) {
552 this.physicalId = id;
553 return this;
554 }
555
556 public TiIndexInfo getIndexInfo() {
557 return indexInfo;
558 }
559
560 public TiDAGRequest setIndexInfo(TiIndexInfo indexInfo) {
561 this.indexInfo = requireNonNull(indexInfo, "indexInfo is null");
562 return this;
563 }
564
565 public void clearIndexInfo() {
566 indexInfo = null;
567 clearPushDownInfo();
568 }
569
570 public int getLimit() {
571 return limit;
572 }
573
574
575
576
577
578
579
580 public TiDAGRequest setLimit(int limit) {
581 this.limit = limit;
582 return this;
583 }
584
585 int getTimeZoneOffset() {
586 return timeZoneOffset;
587 }
588
589
590
591
592
593
594
595 TiDAGRequest setTruncateMode(TruncateMode mode) {
596 flags = requireNonNull(mode, "mode is null").mask(flags);
597 return this;
598 }
599
600 @VisibleForTesting
601 long getFlags() {
602 return flags;
603 }
604
605 @VisibleForTesting
606 public TiTimestamp getStartTs() {
607 return startTs;
608 }
609
610
611
612
613
614
615
616 public TiDAGRequest setStartTs(@Nonnull TiTimestamp startTs) {
617 this.startTs = startTs;
618 return this;
619 }
620
621
622
623
624
625
626
627 public TiDAGRequest setHaving(Expression having) {
628 this.having = requireNonNull(having, "having is null");
629 return this;
630 }
631
632 public boolean isDistinct() {
633 return distinct;
634 }
635
636 public TiDAGRequest setDistinct(boolean distinct) {
637 this.distinct = distinct;
638 return this;
639 }
640
641 public TiDAGRequest addAggregate(AggregateFunction expr) {
642 requireNonNull(expr, "aggregation expr is null");
643 aggregates.add(expr);
644 return this;
645 }
646
647 List<AggregateFunction> getAggregates() {
648 return aggregates;
649 }
650
651
652
653
654
655
656
657 public TiDAGRequest addOrderByItem(ByItem byItem) {
658 orderByItems.add(requireNonNull(byItem, "byItem is null"));
659 return this;
660 }
661
662 List<ByItem> getOrderByItems() {
663 return orderByItems;
664 }
665
666
667
668
669
670
671
672 public TiDAGRequest addGroupByItem(ByItem byItem) {
673 groupByItems.add(requireNonNull(byItem, "byItem is null"));
674 return this;
675 }
676
677 public List<ByItem> getGroupByItems() {
678 return groupByItems;
679 }
680
681
682
683
684
685
686
687
688
689
690
691
692 public TiDAGRequest addRequiredColumn(ColumnRef column) {
693 if (!column.isResolved()) {
694 throw new UnsupportedOperationException(
695 String.format("cannot add unresolved column %s to dag request", column.getName()));
696 }
697 fields.add(requireNonNull(column, "columnRef is null"));
698 return this;
699 }
700
701 public List<ColumnRef> getFields() {
702 return fields;
703 }
704
705
706 private void addRequiredIndexDataType() {
707 indexDataTypes.add(requireNonNull(IntegerType.BIGINT, "dataType is null"));
708 }
709
710 public List<DataType> getIndexDataTypes() {
711 return indexDataTypes;
712 }
713
714
715
716
717
718
719 public TiDAGRequest addRanges(Map<Long, List<Coprocessor.KeyRange>> ranges) {
720 idToRanges.putAll(requireNonNull(ranges, "KeyRange is null"));
721 return this;
722 }
723
724 private void resetRanges() {
725 idToRanges.clear();
726 }
727
728 public void resetFilters(List<Expression> filters) {
729 this.filters.clear();
730 this.filters.addAll(filters);
731 }
732
733 public List<Coprocessor.KeyRange> getRangesByPhysicalId(long physicalId) {
734 return idToRanges.get(physicalId);
735 }
736
737 public Map<Long, List<Coprocessor.KeyRange>> getRangesMaps() {
738 return idToRanges;
739 }
740
741 public TiDAGRequest addFilters(List<Expression> filters) {
742 this.filters.addAll(requireNonNull(filters, "filters expr is null"));
743 return this;
744 }
745
746 public List<Expression> getFilters() {
747 return filters;
748 }
749
750 public void addDowngradeFilter(Expression filter) {
751 this.downgradeFilters.add(requireNonNull(filter, "downgrade filter is null"));
752 }
753
754 public List<Expression> getDowngradeFilters() {
755 return downgradeFilters;
756 }
757
758 private void addPushDownFilters() {
759
760
761 this.pushDownFilters.addAll(filters);
762 }
763
764 private List<Expression> getPushDownFilters() {
765 return pushDownFilters;
766 }
767
768 private void addPushDownAggregates() {
769 this.pushDownAggregates.addAll(aggregates);
770 }
771
772 public List<AggregateFunction> getPushDownAggregates() {
773 return pushDownAggregates;
774 }
775
776 private void addPushDownGroupBys() {
777 this.pushDownGroupBys.addAll(getGroupByItems());
778 }
779
780 public List<ByItem> getPushDownGroupBys() {
781 return pushDownGroupBys;
782 }
783
784 private void addPushDownOrderBys() {
785 this.pushDownOrderBys.addAll(getOrderByItems());
786 }
787
788 public List<ByItem> getPushDownOrderBys() {
789 return pushDownOrderBys;
790 }
791
792 private void addPushDownLimits() {
793 this.pushDownLimits = limit;
794 }
795
796 private int getPushDownLimits() {
797 return pushDownLimits;
798 }
799
800 private void clearPushDownInfo() {
801 indexDataTypes.clear();
802 pushDownFilters.clear();
803 pushDownAggregates.clear();
804 pushDownGroupBys.clear();
805 pushDownOrderBys.clear();
806 pushDownLimits = 0;
807 }
808
809
810
811
812
813
814 public boolean hasPushDownAggregate() {
815 return !getPushDownAggregates().isEmpty();
816 }
817
818
819
820
821
822
823 public boolean hasPushDownGroupBy() {
824 return !getPushDownGroupBys().isEmpty();
825 }
826
827
828
829
830
831
832
833 public boolean isDoubleRead() {
834 return isDoubleRead;
835 }
836
837
838
839
840
841
842 public void setIsDoubleRead(boolean isDoubleRead) {
843 this.isDoubleRead = isDoubleRead;
844 }
845
846
847
848
849
850
851 private boolean isCoveringIndexScan() {
852 return hasIndex() && !isDoubleRead();
853 }
854
855
856
857
858
859
860 public boolean hasIndex() {
861 return indexInfo != null;
862 }
863
864
865
866
867
868
869 public PushDownType getPushDownType() {
870 return pushDownType;
871 }
872
873
874 public double getEstimatedCount() {
875 return estimatedCount;
876 }
877
878
879 public void setEstimatedCount(double estimatedCount) {
880 this.estimatedCount = estimatedCount;
881 }
882
883 public void init(boolean readHandle) {
884 if (readHandle) {
885 buildIndexScan();
886 } else {
887 buildTableScan();
888 }
889 }
890
891 private void init() {
892 init(hasIndex());
893 }
894
895 public IndexScanType getIndexScanType() {
896 if (hasIndex()) {
897 if (isDoubleRead) {
898 return IndexScanType.INDEX_SCAN;
899 } else {
900 return IndexScanType.COVERING_INDEX_SCAN;
901 }
902 } else {
903 return IndexScanType.TABLE_SCAN;
904 }
905 }
906
907 @Override
908 public String toString() {
909 init();
910 StringBuilder sb = new StringBuilder();
911 if (tableInfo != null) {
912 sb.append(String.format("[table: %s] ", tableInfo.getName()));
913 }
914
915 boolean isIndexScan = false;
916 switch (getIndexScanType()) {
917 case INDEX_SCAN:
918 sb.append("IndexScan");
919 sb.append(String.format("[Index: %s] ", indexInfo.getName()));
920 isIndexScan = true;
921 break;
922 case COVERING_INDEX_SCAN:
923 sb.append("CoveringIndexScan");
924 sb.append(String.format("[Index: %s] ", indexInfo.getName()));
925 break;
926 case TABLE_SCAN:
927 sb.append("TableScan");
928 }
929
930 if (!getFields().isEmpty()) {
931 sb.append(", Columns: ");
932 Joiner.on(", ").skipNulls().appendTo(sb, getFields());
933 }
934
935 if (isIndexScan && !getDowngradeFilters().isEmpty()) {
936 sb.append(", Downgrade Filter: ");
937 Joiner.on(", ").skipNulls().appendTo(sb, getDowngradeFilters());
938 }
939
940 if (!isIndexScan && !getFilters().isEmpty()) {
941 sb.append(", Residual Filter: ");
942 Joiner.on(", ").skipNulls().appendTo(sb, getFilters());
943 }
944
945 if (!getPushDownFilters().isEmpty()) {
946 sb.append(", PushDown Filter: ");
947 Joiner.on(", ").skipNulls().appendTo(sb, getPushDownFilters());
948 }
949
950
951 if (!getRangesMaps().isEmpty()) {
952 sb.append(", KeyRange: [");
953 if (tableInfo.isPartitionEnabled()) {
954 getRangesMaps()
955 .forEach(
956 (key, value) -> {
957 for (Coprocessor.KeyRange v : value) {
958 sb.append(" partition: ").append(getPrunedPartName(key));
959 sb.append(KeyUtils.formatBytesUTF8(v));
960 }
961 });
962 } else {
963 getRangesMaps()
964 .values()
965 .forEach(
966 vList -> {
967 for (Coprocessor.KeyRange range : vList) {
968 sb.append(KeyUtils.formatBytesUTF8(range));
969 }
970 });
971 }
972 sb.append("]");
973 }
974
975 if (!getPushDownFilters().isEmpty()) {
976 sb.append(", Aggregates: ");
977 Joiner.on(", ").skipNulls().appendTo(sb, getPushDownAggregates());
978 }
979
980 if (!getGroupByItems().isEmpty()) {
981 sb.append(", Group By: ");
982 Joiner.on(", ").skipNulls().appendTo(sb, getGroupByItems());
983 }
984
985 if (!getOrderByItems().isEmpty()) {
986 sb.append(", Order By: ");
987 Joiner.on(", ").skipNulls().appendTo(sb, getOrderByItems());
988 }
989
990 if (getLimit() != 0) {
991 sb.append(", Limit: ");
992 sb.append("[").append(limit).append("]");
993 }
994 sb.append(", startTs: ").append(startTs.getVersion());
995 return sb.toString();
996 }
997
998 public TiDAGRequest copy() {
999 try {
1000 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1001 ObjectOutputStream oos = new ObjectOutputStream(baos);
1002 oos.writeObject(this);
1003 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1004 ObjectInputStream ois = new ObjectInputStream(bais);
1005 return ((TiDAGRequest) ois.readObject());
1006 } catch (Exception e) {
1007 throw new RuntimeException(e);
1008 }
1009 }
1010
1011 public TiDAGRequest copyReqWithPhysicalId(long id) {
1012 TiDAGRequest req = this.copy();
1013 req.setPhysicalId(id);
1014 List<Coprocessor.KeyRange> currentIdRange = req.getRangesByPhysicalId(id);
1015 req.resetRanges();
1016 Map<Long, List<Coprocessor.KeyRange>> rangeMap = new HashMap<>();
1017 rangeMap.put(id, currentIdRange);
1018 req.addRanges(rangeMap);
1019 return req;
1020 }
1021
1022 public enum TruncateMode {
1023 IgnoreTruncation(0x1),
1024 TruncationAsWarning(0x2);
1025
1026 private final long mask;
1027
1028 TruncateMode(long mask) {
1029 this.mask = mask;
1030 }
1031
1032 public long mask(long flags) {
1033 return flags | mask;
1034 }
1035 }
1036
1037
1038 public enum PushDownType {
1039 STREAMING,
1040 NORMAL
1041 }
1042
1043 public enum IndexScanType {
1044 INDEX_SCAN,
1045 COVERING_INDEX_SCAN,
1046 TABLE_SCAN
1047 }
1048
1049 public static class Builder {
1050 private final List<String> requiredCols = new ArrayList<>();
1051 private final List<Expression> filters = new ArrayList<>();
1052 private final List<ByItem> orderBys = new ArrayList<>();
1053 private final Map<Long, List<Coprocessor.KeyRange>> ranges = new HashMap<>();
1054 private TiTableInfo tableInfo;
1055 private long physicalId;
1056 private int limit;
1057 private TiTimestamp startTs;
1058
1059 public static Builder newBuilder() {
1060 return new Builder();
1061 }
1062
1063 public Builder setFullTableScan(TiTableInfo tableInfo) {
1064 requireNonNull(tableInfo);
1065 setTableInfo(tableInfo);
1066 if (!tableInfo.isPartitionEnabled()) {
1067 RowKey start = RowKey.createMin(tableInfo.getId());
1068 RowKey end = RowKey.createBeyondMax(tableInfo.getId());
1069 ranges.put(
1070 tableInfo.getId(),
1071 ImmutableList.of(
1072 KeyRangeUtils.makeCoprocRange(start.toByteString(), end.toByteString())));
1073 } else {
1074 for (TiPartitionDef pDef : tableInfo.getPartitionInfo().getDefs()) {
1075 RowKey start = RowKey.createMin(pDef.getId());
1076 RowKey end = RowKey.createBeyondMax(pDef.getId());
1077 ranges.put(
1078 pDef.getId(),
1079 ImmutableList.of(
1080 KeyRangeUtils.makeCoprocRange(start.toByteString(), end.toByteString())));
1081 }
1082 }
1083
1084 return this;
1085 }
1086
1087 public Builder setLimit(int limit) {
1088 this.limit = limit;
1089 return this;
1090 }
1091
1092 public Builder setTableInfo(TiTableInfo tableInfo) {
1093 this.tableInfo = tableInfo;
1094 setPhysicalId(tableInfo.getId());
1095 return this;
1096 }
1097
1098 public Builder setPhysicalId(long id) {
1099 this.physicalId = id;
1100 return this;
1101 }
1102
1103 public Builder addRequiredCols(List<String> cols) {
1104 this.requiredCols.addAll(cols);
1105 return this;
1106 }
1107
1108 public Builder addFilter(Expression filter) {
1109 this.filters.add(filter);
1110 return this;
1111 }
1112
1113 public Builder addOrderBy(ByItem item) {
1114 this.orderBys.add(item);
1115 return this;
1116 }
1117
1118 public Builder setStartTs(@Nonnull TiTimestamp ts) {
1119 this.startTs = ts;
1120 return this;
1121 }
1122
1123 public TiDAGRequest build(PushDownType pushDownType) {
1124 TiDAGRequest req = new TiDAGRequest(pushDownType);
1125 req.setTableInfo(tableInfo);
1126 req.setPhysicalId(physicalId);
1127 req.addRanges(ranges);
1128 req.addFilters(filters);
1129
1130 req.addPushDownFilters();
1131 if (!orderBys.isEmpty()) {
1132 orderBys.forEach(req::addOrderByItem);
1133 }
1134 if (limit != 0) {
1135 req.setLimit(limit);
1136 }
1137 requiredCols.forEach(c -> req.addRequiredColumn(ColumnRef.create(c, tableInfo.getColumn(c))));
1138 req.setStartTs(startTs);
1139
1140 return req;
1141 }
1142 }
1143 }