View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.meta;
19  
20  import static com.google.common.base.Preconditions.checkArgument;
21  import static com.google.common.base.Preconditions.checkNotNull;
22  import static java.util.Objects.requireNonNull;
23  import static org.tikv.common.predicates.PredicateUtils.mergeCNFExpressions;
24  
25  import com.google.common.annotations.VisibleForTesting;
26  import com.google.common.base.Joiner;
27  import com.google.common.collect.ImmutableList;
28  import com.google.common.collect.ImmutableMap;
29  import com.pingcap.tidb.tipb.*;
30  import java.io.*;
31  import java.util.*;
32  import java.util.stream.Collectors;
33  import javax.annotation.Nonnull;
34  import org.tikv.common.codec.KeyUtils;
35  import org.tikv.common.exception.DAGRequestException;
36  import org.tikv.common.exception.TiClientInternalException;
37  import org.tikv.common.expression.AggregateFunction;
38  import org.tikv.common.expression.ByItem;
39  import org.tikv.common.expression.ColumnRef;
40  import org.tikv.common.expression.Expression;
41  import org.tikv.common.expression.visitor.ProtoConverter;
42  import org.tikv.common.key.RowKey;
43  import org.tikv.common.predicates.PredicateUtils;
44  import org.tikv.common.region.TiStoreType;
45  import org.tikv.common.types.DataType;
46  import org.tikv.common.types.IntegerType;
47  import org.tikv.common.util.KeyRangeUtils;
48  import org.tikv.kvproto.Coprocessor;
49  
50  /**
51   * Type TiDAGRequest.
52   *
53   * <p>Used for constructing a new DAG request to TiKV
54   */
55  public class TiDAGRequest implements Serializable {
56    /** Predefined executor priority map. */
57    private static final Map<ExecType, Integer> EXEC_TYPE_PRIORITY_MAP =
58        ImmutableMap.<ExecType, Integer>builder()
59            .put(ExecType.TypeTableScan, 0)
60            .put(ExecType.TypeIndexScan, 0)
61            .put(ExecType.TypeSelection, 1)
62            .put(ExecType.TypeAggregation, 2)
63            .put(ExecType.TypeTopN, 3)
64            .put(ExecType.TypeLimit, 4)
65            .build();
66  
67    private static final ColumnInfo handleColumn =
68        ColumnInfo.newBuilder()
69            .setColumnId(-1)
70            .setPkHandle(true)
71            // We haven't changed the field name in protobuf file, but
72            // we need to set this to true in order to retrieve the handle,
73            // so the name 'setPkHandle' may sounds strange.
74            .setTp(8)
75            .setColumnLen(20)
76            .setFlag(2)
77            .build();
78    private final List<ColumnRef> fields = new ArrayList<>();
79    private final List<DataType> indexDataTypes = new ArrayList<>();
80    private final List<Expression> filters = new ArrayList<>();
81    private final List<ByItem> groupByItems = new ArrayList<>();
82    private final List<ByItem> orderByItems = new ArrayList<>();
83    // System like Spark has different type promotion rules
84    // we need a cast to target when given
85    private final List<AggregateFunction> aggregates = new ArrayList<>();
86    private final Map<Long, List<Coprocessor.KeyRange>> idToRanges = new HashMap<>();
87    // If index scanning of this request is not possible in some scenario, we downgrade it
88    // to a table scan and use downGradeRanges instead of index scan ranges stored in
89    // idToRanges along with downgradeFilters to perform a table scan.
90    private final List<Expression> downgradeFilters = new ArrayList<>();
91    private final List<Expression> pushDownFilters = new ArrayList<>();
92    private final List<AggregateFunction> pushDownAggregates = new ArrayList<>();
93    private final List<ByItem> pushDownGroupBys = new ArrayList<>();
94    private final List<ByItem> pushDownOrderBys = new ArrayList<>();
95    private final PushDownType pushDownType;
96    private TiTableInfo tableInfo;
97    private List<TiPartitionDef> prunedParts;
98    private TiStoreType storeType = TiStoreType.TiKV;
99    private TiIndexInfo indexInfo;
100   private List<Long> prunedPhysicalIds = new ArrayList<>();
101   private final Map<Long, String> prunedPartNames = new HashMap<>();
102   private long physicalId;
103   private int pushDownLimits;
104   private int limit;
105   private int timeZoneOffset;
106   private long flags;
107   private TiTimestamp startTs;
108   private Expression having;
109   private boolean distinct;
110   private boolean isDoubleRead;
111   private EncodeType encodeType;
112   private double estimatedCount = -1;
113 
114   public TiDAGRequest(PushDownType pushDownType) {
115     this.pushDownType = pushDownType;
116     this.encodeType = EncodeType.TypeDefault;
117   }
118 
119   private TiDAGRequest(PushDownType pushDownType, EncodeType encodeType) {
120     this.pushDownType = pushDownType;
121     this.encodeType = encodeType;
122   }
123 
124   public TiDAGRequest(PushDownType pushDownType, EncodeType encodeType, int timeZoneOffset) {
125     this(pushDownType, encodeType);
126     this.timeZoneOffset = timeZoneOffset;
127   }
128 
129   public TiDAGRequest(PushDownType pushDownType, int timeZoneOffset) {
130     this(pushDownType, EncodeType.TypeDefault);
131     this.timeZoneOffset = timeZoneOffset;
132   }
133 
134   public List<TiPartitionDef> getPrunedParts() {
135     return prunedParts;
136   }
137 
138   private String getPrunedPartName(long id) {
139     return prunedPartNames.getOrDefault(id, "unknown");
140   }
141 
142   public void setPrunedParts(List<TiPartitionDef> prunedParts) {
143     this.prunedParts = prunedParts;
144     if (prunedParts != null) {
145       List<Long> ids = new ArrayList<>();
146       prunedPartNames.clear();
147       for (TiPartitionDef pDef : prunedParts) {
148         ids.add(pDef.getId());
149         prunedPartNames.put(pDef.getId(), pDef.getName());
150       }
151       this.prunedPhysicalIds = ids;
152     }
153   }
154 
155   public List<Long> getPrunedPhysicalIds() {
156     if (!this.tableInfo.isPartitionEnabled()) {
157       return prunedPhysicalIds = ImmutableList.of(this.tableInfo.getId());
158     } else {
159       return prunedPhysicalIds;
160     }
161   }
162 
163   public TiStoreType getStoreType() {
164     return storeType;
165   }
166 
167   public void setStoreType(TiStoreType storeType) {
168     this.storeType = storeType;
169   }
170 
171   public EncodeType getEncodeType() {
172     return encodeType;
173   }
174 
175   public void setEncodeType(EncodeType encodeType) {
176     this.encodeType = encodeType;
177   }
178 
179   public DAGRequest buildIndexScan() {
180     List<Integer> outputOffsets = new ArrayList<>();
181     DAGRequest.Builder builder = buildScan(true, outputOffsets);
182     return buildRequest(builder, outputOffsets);
183   }
184 
185   public DAGRequest buildTableScan() {
186     List<Integer> outputOffsets = new ArrayList<>();
187     boolean isCoveringIndex = isCoveringIndexScan();
188     DAGRequest.Builder builder = buildScan(isCoveringIndex, outputOffsets);
189     return buildRequest(builder, outputOffsets);
190   }
191 
192   private DAGRequest buildRequest(
193       DAGRequest.Builder dagRequestBuilder, List<Integer> outputOffsets) {
194     checkNotNull(startTs, "startTs is null");
195     checkArgument(startTs.getVersion() != 0, "timestamp is 0");
196     DAGRequest request =
197         dagRequestBuilder
198             .setTimeZoneOffset(timeZoneOffset)
199             .setFlags(flags)
200             .addAllOutputOffsets(outputOffsets)
201             .setEncodeType(this.encodeType)
202             // set start ts fallback is to solving compatible issue.
203             .setStartTsFallback(startTs.getVersion())
204             .build();
205 
206     validateRequest(request);
207     return request;
208   }
209 
210   /**
211    * Unify indexScan and tableScan building logic since they are very much alike. DAGRequest for
212    * IndexScan should also contain filters and aggregation, so we can reuse this part of logic.
213    *
214    * <p>DAGRequest is made up of a chain of executors with strict orders: TableScan/IndexScan >
215    * Selection > Aggregation > TopN/Limit a DAGRequest must contain one and only one TableScan or
216    * IndexScan.
217    *
218    * @param buildIndexScan whether the dagRequest to build should be an {@link IndexScan}
219    * @return final DAGRequest built
220    */
221   private DAGRequest.Builder buildScan(boolean buildIndexScan, List<Integer> outputOffsets) {
222     long id = getPhysicalId();
223     checkNotNull(startTs, "startTs is null");
224     checkArgument(startTs.getVersion() != 0, "timestamp is 0");
225     clearPushDownInfo();
226     DAGRequest.Builder dagRequestBuilder = DAGRequest.newBuilder();
227     Executor.Builder executorBuilder = Executor.newBuilder();
228     IndexScan.Builder indexScanBuilder = IndexScan.newBuilder();
229     TableScan.Builder tblScanBuilder = TableScan.newBuilder();
230     // find a column's offset in fields
231     Map<String, Integer> colOffsetInFieldMap = new HashMap<>();
232     // find a column's position in index
233     Map<String, Integer> colPosInIndexMap = new HashMap<>();
234 
235     if (buildIndexScan) {
236       // IndexScan
237       if (indexInfo == null) {
238         throw new TiClientInternalException("Index is empty for index scan");
239       }
240       List<TiColumnInfo> columnInfoList = tableInfo.getColumns();
241       boolean hasPk = false;
242       // We extract index column info
243       List<Integer> indexColOffsets =
244           indexInfo
245               .getIndexColumns()
246               .stream()
247               .map(TiIndexColumn::getOffset)
248               .collect(Collectors.toList());
249 
250       int idxPos = 0;
251       // for index scan builder, columns are added by its order in index
252       for (Integer idx : indexColOffsets) {
253         TiColumnInfo tiColumnInfo = columnInfoList.get(idx);
254         ColumnInfo columnInfo = tiColumnInfo.toProto(tableInfo);
255         colPosInIndexMap.put(tiColumnInfo.getName(), idxPos++);
256 
257         ColumnInfo.Builder colBuilder = ColumnInfo.newBuilder(columnInfo);
258         if (columnInfo.getColumnId() == -1) {
259           hasPk = true;
260           colBuilder.setPkHandle(true);
261         }
262         indexScanBuilder.addColumns(colBuilder);
263       }
264 
265       int colCount = indexScanBuilder.getColumnsCount();
266       if (isDoubleRead()) {
267         // double read case: need to retrieve handle
268         // =================== IMPORTANT ======================
269         // offset for dagRequest should be in accordance with fields
270         // The last pos will be the handle
271         // TODO: we may merge indexDoubleRead and coveringIndexRead logic
272         for (ColumnRef col : getFields()) {
273           Integer pos = colPosInIndexMap.get(col.getName());
274           if (pos != null) {
275             TiColumnInfo columnInfo = columnInfoList.get(indexColOffsets.get(pos));
276             if (col.matchName(columnInfo.getName())) {
277               colOffsetInFieldMap.put(col.getName(), pos);
278             }
279             // TODO: primary key may also be considered if pkIsHandle
280           }
281         }
282         // double read case
283         if (!hasPk) {
284           // add handle column
285           indexScanBuilder.addColumns(handleColumn);
286           ++colCount;
287           addRequiredIndexDataType();
288         }
289 
290         if (colCount == 0) {
291           throw new DAGRequestException("Incorrect index scan with zero column count");
292         }
293 
294         outputOffsets.add(colCount - 1);
295       } else {
296         boolean pkIsNeeded = false;
297         // =================== IMPORTANT ======================
298         // offset for dagRequest should be in accordance with fields
299         for (ColumnRef col : getFields()) {
300           Integer pos = colPosInIndexMap.get(col.getName());
301           if (pos != null) {
302             TiColumnInfo columnInfo = columnInfoList.get(indexColOffsets.get(pos));
303             if (col.matchName(columnInfo.getName())) {
304               outputOffsets.add(pos);
305               colOffsetInFieldMap.put(col.getName(), pos);
306             }
307           }
308           // if a column of field is not contained in index selected,
309           // logically it must be the pk column. Extra check here.
310           else if (tableInfo.getColumn(col.getName()).isPrimaryKey()) {
311             pkIsNeeded = true;
312             // offset should be processed for each primary key encountered
313             outputOffsets.add(colCount);
314             // for index scan, column offset must be in the order of index->handle
315             colOffsetInFieldMap.put(col.getName(), indexColOffsets.size());
316           } else {
317             throw new DAGRequestException(
318                 "columns other than primary key and index key exist in fields while index single read: "
319                     + col.getName());
320           }
321         }
322         // pk is not included in index but still needed
323         if (pkIsNeeded) {
324           indexScanBuilder.addColumns(handleColumn);
325         }
326       }
327       executorBuilder.setTp(ExecType.TypeIndexScan);
328 
329       indexScanBuilder.setTableId(id).setIndexId(indexInfo.getId());
330       dagRequestBuilder.addExecutors(executorBuilder.setIdxScan(indexScanBuilder).build());
331     } else {
332       // TableScan
333       executorBuilder.setTp(ExecType.TypeTableScan);
334       tblScanBuilder.setTableId(id);
335       // Step1. Add columns to first executor
336       int lastOffset = 0;
337       for (ColumnRef col : getFields()) {
338         // can't allow duplicated col added into executor.
339         if (!colOffsetInFieldMap.containsKey(col.getName())) {
340           tblScanBuilder.addColumns(tableInfo.getColumn(col.getName()).toProto(tableInfo));
341           colOffsetInFieldMap.put(col.getName(), lastOffset);
342           lastOffset++;
343         }
344         // column offset should be in accordance with fields
345         outputOffsets.add(colOffsetInFieldMap.get(col.getName()));
346       }
347 
348       dagRequestBuilder.addExecutors(executorBuilder.setTblScan(tblScanBuilder));
349     }
350 
351     boolean isIndexDoubleScan = buildIndexScan && isDoubleRead();
352 
353     // Should build these executors when performing CoveringIndexScan/TableScan
354 
355     // clear executorBuilder
356     executorBuilder.clear();
357 
358     // Step2. Add others
359     // DO NOT EDIT EXPRESSION CONSTRUCTION ORDER
360     // Or make sure the construction order is below:
361     // TableScan/IndexScan > Selection > Aggregation > TopN/Limit
362 
363     Expression whereExpr = mergeCNFExpressions(getFilters());
364     if (whereExpr != null) {
365       if (!isIndexDoubleScan || isExpressionCoveredByIndex(whereExpr)) {
366         executorBuilder.setTp(ExecType.TypeSelection);
367         dagRequestBuilder.addExecutors(
368             executorBuilder.setSelection(
369                 Selection.newBuilder()
370                     .addConditions(ProtoConverter.toProto(whereExpr, colOffsetInFieldMap))));
371         executorBuilder.clear();
372         addPushDownFilters();
373       } else {
374         return dagRequestBuilder;
375       }
376     }
377 
378     if (!getGroupByItems().isEmpty() || !getAggregates().isEmpty()) {
379       // only allow table scan or covering index scan push down groupby and agg
380       if (!isIndexDoubleScan || (isGroupByCoveredByIndex() && isAggregateCoveredByIndex())) {
381         pushDownAggAndGroupBy(
382             dagRequestBuilder, executorBuilder, outputOffsets, colOffsetInFieldMap);
383       } else {
384         return dagRequestBuilder;
385       }
386     }
387 
388     if (!getOrderByItems().isEmpty()) {
389       if (!isIndexDoubleScan || isOrderByCoveredByIndex()) {
390         // only allow table scan or covering index scan push down orderby
391         pushDownOrderBy(dagRequestBuilder, executorBuilder, colOffsetInFieldMap);
392       }
393     } else if (getLimit() != 0) {
394       if (!isIndexDoubleScan) {
395         pushDownLimit(dagRequestBuilder, executorBuilder);
396       }
397     }
398 
399     return dagRequestBuilder;
400   }
401 
402   private void pushDownLimit(
403       DAGRequest.Builder dagRequestBuilder, Executor.Builder executorBuilder) {
404     Limit.Builder limitBuilder = Limit.newBuilder();
405     limitBuilder.setLimit(getLimit());
406     executorBuilder.setTp(ExecType.TypeLimit);
407     dagRequestBuilder.addExecutors(executorBuilder.setLimit(limitBuilder));
408     executorBuilder.clear();
409     addPushDownLimits();
410   }
411 
412   private void pushDownOrderBy(
413       DAGRequest.Builder dagRequestBuilder,
414       Executor.Builder executorBuilder,
415       Map<String, Integer> colOffsetInFieldMap) {
416     TopN.Builder topNBuilder = TopN.newBuilder();
417     getOrderByItems()
418         .forEach(
419             tiByItem ->
420                 topNBuilder.addOrderBy(
421                     com.pingcap.tidb.tipb.ByItem.newBuilder()
422                         .setExpr(ProtoConverter.toProto(tiByItem.getExpr(), colOffsetInFieldMap))
423                         .setDesc(tiByItem.isDesc())));
424     executorBuilder.setTp(ExecType.TypeTopN);
425     topNBuilder.setLimit(getLimit());
426     dagRequestBuilder.addExecutors(executorBuilder.setTopN(topNBuilder));
427     executorBuilder.clear();
428     addPushDownOrderBys();
429   }
430 
431   private void pushDownAggAndGroupBy(
432       DAGRequest.Builder dagRequestBuilder,
433       Executor.Builder executorBuilder,
434       List<Integer> outputOffsets,
435       Map<String, Integer> colOffsetInFieldMap) {
436     Aggregation.Builder aggregationBuilder = Aggregation.newBuilder();
437     getAggregates()
438         .forEach(
439             tiExpr ->
440                 aggregationBuilder.addAggFunc(ProtoConverter.toProto(tiExpr, colOffsetInFieldMap)));
441     getGroupByItems()
442         .forEach(
443             tiByItem ->
444                 aggregationBuilder.addGroupBy(
445                     ProtoConverter.toProto(tiByItem.getExpr(), colOffsetInFieldMap)));
446     executorBuilder.setTp(ExecType.TypeAggregation);
447     dagRequestBuilder.addExecutors(executorBuilder.setAggregation(aggregationBuilder));
448     executorBuilder.clear();
449     addPushDownGroupBys();
450     addPushDownAggregates();
451 
452     // adding output offsets for aggs
453     outputOffsets.clear();
454     for (int i = 0; i < getAggregates().size(); i++) {
455       outputOffsets.add(i);
456     }
457 
458     // adding output offsets for group by
459     int currentMaxOutputOffset = outputOffsets.get(outputOffsets.size() - 1) + 1;
460     for (int i = 0; i < getGroupByItems().size(); i++) {
461       outputOffsets.add(currentMaxOutputOffset + i);
462     }
463   }
464 
465   private boolean isExpressionCoveredByIndex(Expression expr) {
466     Set<String> indexColumnRefSet =
467         indexInfo
468             .getIndexColumns()
469             .stream()
470             .filter(x -> !x.isPrefixIndex())
471             .map(TiIndexColumn::getName)
472             .collect(Collectors.toSet());
473     return !isDoubleRead()
474         && PredicateUtils.extractColumnRefFromExpression(expr)
475             .stream()
476             .map(ColumnRef::getName)
477             .allMatch(indexColumnRefSet::contains);
478   }
479 
480   private boolean isGroupByCoveredByIndex() {
481     return isByItemCoveredByIndex(getGroupByItems());
482   }
483 
484   private boolean isOrderByCoveredByIndex() {
485     return isByItemCoveredByIndex(getOrderByItems());
486   }
487 
488   private boolean isByItemCoveredByIndex(List<ByItem> byItems) {
489     if (byItems.isEmpty()) {
490       return false;
491     }
492     return byItems.stream().allMatch(x -> isExpressionCoveredByIndex(x.getExpr()));
493   }
494 
495   private boolean isAggregateCoveredByIndex() {
496     if (aggregates.isEmpty()) {
497       return false;
498     }
499     return aggregates.stream().allMatch(this::isExpressionCoveredByIndex);
500   }
501 
502   /**
503    * Check if a DAG request is valid.
504    *
505    * <p>Note: When constructing a DAG request, a executor with an ExecType of higher priority should
506    * always be placed before those lower ones.
507    *
508    * @param dagRequest Request DAG.
509    */
510   private void validateRequest(DAGRequest dagRequest) {
511     requireNonNull(dagRequest);
512     // check encode type
513     requireNonNull(dagRequest.getEncodeType());
514 
515     // A DAG request must has at least one executor.
516     if (dagRequest.getExecutorsCount() < 1) {
517       throw new DAGRequestException("Invalid executors count:" + dagRequest.getExecutorsCount());
518     }
519     // A DAG request must start with TableScan or IndexScan Executor
520     ExecType formerType = dagRequest.getExecutors(0).getTp();
521     if (formerType != ExecType.TypeTableScan && formerType != ExecType.TypeIndexScan) {
522       throw new DAGRequestException(
523           "Invalid first executor type:"
524               + formerType
525               + ", must one of TypeTableScan or TypeIndexScan");
526     }
527 
528     for (int i = 1; i < dagRequest.getExecutorsCount(); i++) {
529       ExecType currentType = dagRequest.getExecutors(i).getTp();
530       if (EXEC_TYPE_PRIORITY_MAP.get(currentType) < EXEC_TYPE_PRIORITY_MAP.get(formerType)) {
531         throw new DAGRequestException("Invalid executor priority.");
532       }
533       formerType = currentType;
534     }
535   }
536 
537   public TiTableInfo getTableInfo() {
538     return this.tableInfo;
539   }
540 
541   public TiDAGRequest setTableInfo(TiTableInfo tableInfo) {
542     this.tableInfo = requireNonNull(tableInfo, "tableInfo is null");
543     setPhysicalId(tableInfo.getId());
544     return this;
545   }
546 
547   public long getPhysicalId() {
548     return this.physicalId;
549   }
550 
551   public TiDAGRequest setPhysicalId(long id) {
552     this.physicalId = id;
553     return this;
554   }
555 
556   public TiIndexInfo getIndexInfo() {
557     return indexInfo;
558   }
559 
560   public TiDAGRequest setIndexInfo(TiIndexInfo indexInfo) {
561     this.indexInfo = requireNonNull(indexInfo, "indexInfo is null");
562     return this;
563   }
564 
565   public void clearIndexInfo() {
566     indexInfo = null;
567     clearPushDownInfo();
568   }
569 
570   public int getLimit() {
571     return limit;
572   }
573 
574   /**
575    * add limit clause to select query.
576    *
577    * @param limit is just a integer.
578    * @return a SelectBuilder
579    */
580   public TiDAGRequest setLimit(int limit) {
581     this.limit = limit;
582     return this;
583   }
584 
585   int getTimeZoneOffset() {
586     return timeZoneOffset;
587   }
588 
589   /**
590    * set truncate mode
591    *
592    * @param mode truncate mode
593    * @return a TiDAGRequest
594    */
595   TiDAGRequest setTruncateMode(TruncateMode mode) {
596     flags = requireNonNull(mode, "mode is null").mask(flags);
597     return this;
598   }
599 
600   @VisibleForTesting
601   long getFlags() {
602     return flags;
603   }
604 
605   @VisibleForTesting
606   public TiTimestamp getStartTs() {
607     return startTs;
608   }
609 
610   /**
611    * set start timestamp for the transaction
612    *
613    * @param startTs timestamp
614    * @return a TiDAGRequest
615    */
616   public TiDAGRequest setStartTs(@Nonnull TiTimestamp startTs) {
617     this.startTs = startTs;
618     return this;
619   }
620 
621   /**
622    * set having clause to select query
623    *
624    * @param having is a expression represents Having
625    * @return a TiDAGRequest
626    */
627   public TiDAGRequest setHaving(Expression having) {
628     this.having = requireNonNull(having, "having is null");
629     return this;
630   }
631 
632   public boolean isDistinct() {
633     return distinct;
634   }
635 
636   public TiDAGRequest setDistinct(boolean distinct) {
637     this.distinct = distinct;
638     return this;
639   }
640 
641   public TiDAGRequest addAggregate(AggregateFunction expr) {
642     requireNonNull(expr, "aggregation expr is null");
643     aggregates.add(expr);
644     return this;
645   }
646 
647   List<AggregateFunction> getAggregates() {
648     return aggregates;
649   }
650 
651   /**
652    * add a order by clause to select query.
653    *
654    * @param byItem is a TiByItem.
655    * @return a SelectBuilder
656    */
657   public TiDAGRequest addOrderByItem(ByItem byItem) {
658     orderByItems.add(requireNonNull(byItem, "byItem is null"));
659     return this;
660   }
661 
662   List<ByItem> getOrderByItems() {
663     return orderByItems;
664   }
665 
666   /**
667    * add a group by clause to select query
668    *
669    * @param byItem is a TiByItem
670    * @return a SelectBuilder
671    */
672   public TiDAGRequest addGroupByItem(ByItem byItem) {
673     groupByItems.add(requireNonNull(byItem, "byItem is null"));
674     return this;
675   }
676 
677   public List<ByItem> getGroupByItems() {
678     return groupByItems;
679   }
680 
681   /**
682    * Field is not support in TiDB yet, for here we simply allow TiColumnRef instead of TiExpr like
683    * in SelectRequest proto
684    *
685    * <p>
686    *
687    * <p>This interface allows duplicate columns and it's user's responsibility to do dedup since we
688    * need to ensure exact order and items preserved during decoding
689    *
690    * @param column is column referred during selectReq
691    */
692   public TiDAGRequest addRequiredColumn(ColumnRef column) {
693     if (!column.isResolved()) {
694       throw new UnsupportedOperationException(
695           String.format("cannot add unresolved column %s to dag request", column.getName()));
696     }
697     fields.add(requireNonNull(column, "columnRef is null"));
698     return this;
699   }
700 
701   public List<ColumnRef> getFields() {
702     return fields;
703   }
704 
705   /** Required index columns for double read */
706   private void addRequiredIndexDataType() {
707     indexDataTypes.add(requireNonNull(IntegerType.BIGINT, "dataType is null"));
708   }
709 
710   public List<DataType> getIndexDataTypes() {
711     return indexDataTypes;
712   }
713 
714   /**
715    * set key range of scan
716    *
717    * @param ranges key range of scan
718    */
719   public TiDAGRequest addRanges(Map<Long, List<Coprocessor.KeyRange>> ranges) {
720     idToRanges.putAll(requireNonNull(ranges, "KeyRange is null"));
721     return this;
722   }
723 
724   private void resetRanges() {
725     idToRanges.clear();
726   }
727 
728   public void resetFilters(List<Expression> filters) {
729     this.filters.clear();
730     this.filters.addAll(filters);
731   }
732 
733   public List<Coprocessor.KeyRange> getRangesByPhysicalId(long physicalId) {
734     return idToRanges.get(physicalId);
735   }
736 
737   public Map<Long, List<Coprocessor.KeyRange>> getRangesMaps() {
738     return idToRanges;
739   }
740 
741   public TiDAGRequest addFilters(List<Expression> filters) {
742     this.filters.addAll(requireNonNull(filters, "filters expr is null"));
743     return this;
744   }
745 
746   public List<Expression> getFilters() {
747     return filters;
748   }
749 
750   public void addDowngradeFilter(Expression filter) {
751     this.downgradeFilters.add(requireNonNull(filter, "downgrade filter is null"));
752   }
753 
754   public List<Expression> getDowngradeFilters() {
755     return downgradeFilters;
756   }
757 
758   private void addPushDownFilters() {
759     // all filters will be pushed down
760     // TODO: choose some filters to push down
761     this.pushDownFilters.addAll(filters);
762   }
763 
764   private List<Expression> getPushDownFilters() {
765     return pushDownFilters;
766   }
767 
768   private void addPushDownAggregates() {
769     this.pushDownAggregates.addAll(aggregates);
770   }
771 
772   public List<AggregateFunction> getPushDownAggregates() {
773     return pushDownAggregates;
774   }
775 
776   private void addPushDownGroupBys() {
777     this.pushDownGroupBys.addAll(getGroupByItems());
778   }
779 
780   public List<ByItem> getPushDownGroupBys() {
781     return pushDownGroupBys;
782   }
783 
784   private void addPushDownOrderBys() {
785     this.pushDownOrderBys.addAll(getOrderByItems());
786   }
787 
788   public List<ByItem> getPushDownOrderBys() {
789     return pushDownOrderBys;
790   }
791 
792   private void addPushDownLimits() {
793     this.pushDownLimits = limit;
794   }
795 
796   private int getPushDownLimits() {
797     return pushDownLimits;
798   }
799 
800   private void clearPushDownInfo() {
801     indexDataTypes.clear();
802     pushDownFilters.clear();
803     pushDownAggregates.clear();
804     pushDownGroupBys.clear();
805     pushDownOrderBys.clear();
806     pushDownLimits = 0;
807   }
808 
809   /**
810    * Check whether the DAG request has any aggregate expression.
811    *
812    * @return the boolean
813    */
814   public boolean hasPushDownAggregate() {
815     return !getPushDownAggregates().isEmpty();
816   }
817 
818   /**
819    * Check whether the DAG request has any group by expression.
820    *
821    * @return the boolean
822    */
823   public boolean hasPushDownGroupBy() {
824     return !getPushDownGroupBys().isEmpty();
825   }
826 
827   /**
828    * Returns whether needs to read handle from index first and find its corresponding row. i.e,
829    * "double read"
830    *
831    * @return boolean
832    */
833   public boolean isDoubleRead() {
834     return isDoubleRead;
835   }
836 
837   /**
838    * Sets isDoubleRead
839    *
840    * @param isDoubleRead if is double read
841    */
842   public void setIsDoubleRead(boolean isDoubleRead) {
843     this.isDoubleRead = isDoubleRead;
844   }
845 
846   /**
847    * Returns whether the request is CoveringIndex
848    *
849    * @return boolean
850    */
851   private boolean isCoveringIndexScan() {
852     return hasIndex() && !isDoubleRead();
853   }
854 
855   /**
856    * Returns whether this request is of indexScanType
857    *
858    * @return true iff indexInfo is provided, false otherwise
859    */
860   public boolean hasIndex() {
861     return indexInfo != null;
862   }
863 
864   /**
865    * Whether we use streaming processing to retrieve data
866    *
867    * @return push down type.
868    */
869   public PushDownType getPushDownType() {
870     return pushDownType;
871   }
872 
873   /** Get the estimated row count will be fetched from this request. */
874   public double getEstimatedCount() {
875     return estimatedCount;
876   }
877 
878   /** Set the estimated row count will be fetched from this request. */
879   public void setEstimatedCount(double estimatedCount) {
880     this.estimatedCount = estimatedCount;
881   }
882 
883   public void init(boolean readHandle) {
884     if (readHandle) {
885       buildIndexScan();
886     } else {
887       buildTableScan();
888     }
889   }
890 
891   private void init() {
892     init(hasIndex());
893   }
894 
895   public IndexScanType getIndexScanType() {
896     if (hasIndex()) {
897       if (isDoubleRead) {
898         return IndexScanType.INDEX_SCAN;
899       } else {
900         return IndexScanType.COVERING_INDEX_SCAN;
901       }
902     } else {
903       return IndexScanType.TABLE_SCAN;
904     }
905   }
906 
907   @Override
908   public String toString() {
909     init();
910     StringBuilder sb = new StringBuilder();
911     if (tableInfo != null) {
912       sb.append(String.format("[table: %s] ", tableInfo.getName()));
913     }
914 
915     boolean isIndexScan = false;
916     switch (getIndexScanType()) {
917       case INDEX_SCAN:
918         sb.append("IndexScan");
919         sb.append(String.format("[Index: %s] ", indexInfo.getName()));
920         isIndexScan = true;
921         break;
922       case COVERING_INDEX_SCAN:
923         sb.append("CoveringIndexScan");
924         sb.append(String.format("[Index: %s] ", indexInfo.getName()));
925         break;
926       case TABLE_SCAN:
927         sb.append("TableScan");
928     }
929 
930     if (!getFields().isEmpty()) {
931       sb.append(", Columns: ");
932       Joiner.on(", ").skipNulls().appendTo(sb, getFields());
933     }
934 
935     if (isIndexScan && !getDowngradeFilters().isEmpty()) {
936       sb.append(", Downgrade Filter: ");
937       Joiner.on(", ").skipNulls().appendTo(sb, getDowngradeFilters());
938     }
939 
940     if (!isIndexScan && !getFilters().isEmpty()) {
941       sb.append(", Residual Filter: ");
942       Joiner.on(", ").skipNulls().appendTo(sb, getFilters());
943     }
944 
945     if (!getPushDownFilters().isEmpty()) {
946       sb.append(", PushDown Filter: ");
947       Joiner.on(", ").skipNulls().appendTo(sb, getPushDownFilters());
948     }
949 
950     // Key ranges might be also useful
951     if (!getRangesMaps().isEmpty()) {
952       sb.append(", KeyRange: [");
953       if (tableInfo.isPartitionEnabled()) {
954         getRangesMaps()
955             .forEach(
956                 (key, value) -> {
957                   for (Coprocessor.KeyRange v : value) {
958                     sb.append(" partition: ").append(getPrunedPartName(key));
959                     sb.append(KeyUtils.formatBytesUTF8(v));
960                   }
961                 });
962       } else {
963         getRangesMaps()
964             .values()
965             .forEach(
966                 vList -> {
967                   for (Coprocessor.KeyRange range : vList) {
968                     sb.append(KeyUtils.formatBytesUTF8(range));
969                   }
970                 });
971       }
972       sb.append("]");
973     }
974 
975     if (!getPushDownFilters().isEmpty()) {
976       sb.append(", Aggregates: ");
977       Joiner.on(", ").skipNulls().appendTo(sb, getPushDownAggregates());
978     }
979 
980     if (!getGroupByItems().isEmpty()) {
981       sb.append(", Group By: ");
982       Joiner.on(", ").skipNulls().appendTo(sb, getGroupByItems());
983     }
984 
985     if (!getOrderByItems().isEmpty()) {
986       sb.append(", Order By: ");
987       Joiner.on(", ").skipNulls().appendTo(sb, getOrderByItems());
988     }
989 
990     if (getLimit() != 0) {
991       sb.append(", Limit: ");
992       sb.append("[").append(limit).append("]");
993     }
994     sb.append(", startTs: ").append(startTs.getVersion());
995     return sb.toString();
996   }
997 
998   public TiDAGRequest copy() {
999     try {
1000       ByteArrayOutputStream baos = new ByteArrayOutputStream();
1001       ObjectOutputStream oos = new ObjectOutputStream(baos);
1002       oos.writeObject(this);
1003       ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1004       ObjectInputStream ois = new ObjectInputStream(bais);
1005       return ((TiDAGRequest) ois.readObject());
1006     } catch (Exception e) {
1007       throw new RuntimeException(e);
1008     }
1009   }
1010 
1011   public TiDAGRequest copyReqWithPhysicalId(long id) {
1012     TiDAGRequest req = this.copy();
1013     req.setPhysicalId(id);
1014     List<Coprocessor.KeyRange> currentIdRange = req.getRangesByPhysicalId(id);
1015     req.resetRanges();
1016     Map<Long, List<Coprocessor.KeyRange>> rangeMap = new HashMap<>();
1017     rangeMap.put(id, currentIdRange);
1018     req.addRanges(rangeMap);
1019     return req;
1020   }
1021 
1022   public enum TruncateMode {
1023     IgnoreTruncation(0x1),
1024     TruncationAsWarning(0x2);
1025 
1026     private final long mask;
1027 
1028     TruncateMode(long mask) {
1029       this.mask = mask;
1030     }
1031 
1032     public long mask(long flags) {
1033       return flags | mask;
1034     }
1035   }
1036 
1037   /** Whether we use streaming to push down the request */
1038   public enum PushDownType {
1039     STREAMING,
1040     NORMAL
1041   }
1042 
1043   public enum IndexScanType {
1044     INDEX_SCAN,
1045     COVERING_INDEX_SCAN,
1046     TABLE_SCAN
1047   }
1048 
1049   public static class Builder {
1050     private final List<String> requiredCols = new ArrayList<>();
1051     private final List<Expression> filters = new ArrayList<>();
1052     private final List<ByItem> orderBys = new ArrayList<>();
1053     private final Map<Long, List<Coprocessor.KeyRange>> ranges = new HashMap<>();
1054     private TiTableInfo tableInfo;
1055     private long physicalId;
1056     private int limit;
1057     private TiTimestamp startTs;
1058 
1059     public static Builder newBuilder() {
1060       return new Builder();
1061     }
1062 
1063     public Builder setFullTableScan(TiTableInfo tableInfo) {
1064       requireNonNull(tableInfo);
1065       setTableInfo(tableInfo);
1066       if (!tableInfo.isPartitionEnabled()) {
1067         RowKey start = RowKey.createMin(tableInfo.getId());
1068         RowKey end = RowKey.createBeyondMax(tableInfo.getId());
1069         ranges.put(
1070             tableInfo.getId(),
1071             ImmutableList.of(
1072                 KeyRangeUtils.makeCoprocRange(start.toByteString(), end.toByteString())));
1073       } else {
1074         for (TiPartitionDef pDef : tableInfo.getPartitionInfo().getDefs()) {
1075           RowKey start = RowKey.createMin(pDef.getId());
1076           RowKey end = RowKey.createBeyondMax(pDef.getId());
1077           ranges.put(
1078               pDef.getId(),
1079               ImmutableList.of(
1080                   KeyRangeUtils.makeCoprocRange(start.toByteString(), end.toByteString())));
1081         }
1082       }
1083 
1084       return this;
1085     }
1086 
1087     public Builder setLimit(int limit) {
1088       this.limit = limit;
1089       return this;
1090     }
1091 
1092     public Builder setTableInfo(TiTableInfo tableInfo) {
1093       this.tableInfo = tableInfo;
1094       setPhysicalId(tableInfo.getId());
1095       return this;
1096     }
1097 
1098     public Builder setPhysicalId(long id) {
1099       this.physicalId = id;
1100       return this;
1101     }
1102 
1103     public Builder addRequiredCols(List<String> cols) {
1104       this.requiredCols.addAll(cols);
1105       return this;
1106     }
1107 
1108     public Builder addFilter(Expression filter) {
1109       this.filters.add(filter);
1110       return this;
1111     }
1112 
1113     public Builder addOrderBy(ByItem item) {
1114       this.orderBys.add(item);
1115       return this;
1116     }
1117 
1118     public Builder setStartTs(@Nonnull TiTimestamp ts) {
1119       this.startTs = ts;
1120       return this;
1121     }
1122 
1123     public TiDAGRequest build(PushDownType pushDownType) {
1124       TiDAGRequest req = new TiDAGRequest(pushDownType);
1125       req.setTableInfo(tableInfo);
1126       req.setPhysicalId(physicalId);
1127       req.addRanges(ranges);
1128       req.addFilters(filters);
1129       // this request will push down all filters
1130       req.addPushDownFilters();
1131       if (!orderBys.isEmpty()) {
1132         orderBys.forEach(req::addOrderByItem);
1133       }
1134       if (limit != 0) {
1135         req.setLimit(limit);
1136       }
1137       requiredCols.forEach(c -> req.addRequiredColumn(ColumnRef.create(c, tableInfo.getColumn(c))));
1138       req.setStartTs(startTs);
1139 
1140       return req;
1141     }
1142   }
1143 }