1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

// TODO: Maybe we can find a better place to put these interfaces, e.g. naming it as prelude?

//! Batch executor common structures.

pub use tidb_query_common::execute_stats::{
    ExecSummaryCollector, ExecuteStats, WithSummaryCollector,
};

use tipb::FieldType;

use tidb_query_common::execute_stats::ExecSummaryCollectorEnabled;
use tidb_query_common::storage::IntervalRange;
use tidb_query_common::Result;
use tidb_query_datatype::codec::batch::LazyBatchColumnVec;
use tidb_query_datatype::expr::EvalWarnings;

/// The interface for pull-based executors. It is similar to the Volcano Iterator model, but
/// pulls data in batch and stores data by column.
pub trait BatchExecutor: Send {
    type StorageStats;

    /// Gets the schema of the output.
    fn schema(&self) -> &[FieldType];

    /// Pulls next several rows of data (stored by column).
    ///
    /// This function might return zero rows, which doesn't mean that there is no more result.
    /// See `is_drained` in `BatchExecuteResult`.
    fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult;

    /// Collects execution statistics (including but not limited to metrics and execution summaries)
    /// accumulated during execution and prepares for next collection.
    ///
    /// The executor implementation must invoke this function for each children executor. However
    /// the invocation order of children executors is not stipulated.
    ///
    /// This function may be invoked several times during execution. For each invocation, it should
    /// not contain accumulated meta data in last invocation. Normally the invocation frequency of
    /// this function is less than `next_batch()`.
    fn collect_exec_stats(&mut self, dest: &mut ExecuteStats);

    /// Collects underlying storage statistics accumulated during execution and prepares for
    /// next collection.
    ///
    /// Similar to `collect_exec_stats()`, the implementation must invoke this function for each
    /// children executor and this function may be invoked several times during execution.
    fn collect_storage_stats(&mut self, dest: &mut Self::StorageStats);

    fn take_scanned_range(&mut self) -> IntervalRange;

    fn can_be_cached(&self) -> bool;

    fn collect_summary(
        self,
        output_index: usize,
    ) -> WithSummaryCollector<ExecSummaryCollectorEnabled, Self>
    where
        Self: Sized,
    {
        WithSummaryCollector {
            summary_collector: ExecSummaryCollectorEnabled::new(output_index),
            inner: self,
        }
    }
}

impl<T: BatchExecutor + ?Sized> BatchExecutor for Box<T> {
    type StorageStats = T::StorageStats;

    fn schema(&self) -> &[FieldType] {
        (**self).schema()
    }

    fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult {
        (**self).next_batch(scan_rows)
    }

    fn collect_exec_stats(&mut self, dest: &mut ExecuteStats) {
        (**self).collect_exec_stats(dest);
    }

    fn collect_storage_stats(&mut self, dest: &mut Self::StorageStats) {
        (**self).collect_storage_stats(dest);
    }

    fn take_scanned_range(&mut self) -> IntervalRange {
        (**self).take_scanned_range()
    }

    fn can_be_cached(&self) -> bool {
        (**self).can_be_cached()
    }
}

impl<C: ExecSummaryCollector + Send, T: BatchExecutor> BatchExecutor
    for WithSummaryCollector<C, T>
{
    type StorageStats = T::StorageStats;

    fn schema(&self) -> &[FieldType] {
        self.inner.schema()
    }

    fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult {
        let timer = self.summary_collector.on_start_iterate();
        let result = self.inner.next_batch(scan_rows);
        self.summary_collector
            .on_finish_iterate(timer, result.logical_rows.len());
        result
    }

    fn collect_exec_stats(&mut self, dest: &mut ExecuteStats) {
        self.summary_collector
            .collect(&mut dest.summary_per_executor);
        self.inner.collect_exec_stats(dest);
    }

    fn collect_storage_stats(&mut self, dest: &mut Self::StorageStats) {
        self.inner.collect_storage_stats(dest);
    }

    fn take_scanned_range(&mut self) -> IntervalRange {
        self.inner.take_scanned_range()
    }

    fn can_be_cached(&self) -> bool {
        self.inner.can_be_cached()
    }
}

/// Data to be flowed between parent and child executors' single `next_batch()` invocation.
///
/// Note: there are other data flow between executors, like metrics and output statistics.
/// However they are flowed at once, just before response, instead of each step during execution.
/// Hence they are not covered by this structure. See `BatchExecuteMetaData`.
///
/// It is only `Send` but not `Sync` because executor returns its own data copy. However `Send`
/// enables executors to live in different threads.
///
/// It is designed to be used in new generation executors, i.e. executors support batch execution.
/// The old executors will not be refined to return this kind of result.
pub struct BatchExecuteResult {
    /// The *physical* columns data generated during this invocation.
    ///
    /// Note 1: Empty column data doesn't mean that there is no more data. See `is_drained`.
    ///
    /// Note 2: This is only a *physical* store of data. The data may not be in desired order and
    ///         there could be filtered out data stored inside. You should access the *logical*
    ///         data via the `logical_rows` field. For the same reason, `rows_len() > 0` doesn't
    ///         mean that there is logical data inside.
    pub physical_columns: LazyBatchColumnVec,

    /// Valid row offsets in `physical_columns`, placed in the logical order.
    pub logical_rows: Vec<usize>,

    /// The warnings generated during this invocation.
    // TODO: It can be more general, e.g. `ExecuteWarnings` instead of `EvalWarnings`.
    // TODO: Should be recorded by row.
    pub warnings: EvalWarnings,

    /// Whether or not there is no more data.
    ///
    /// This structure is a `Result`. When it is:
    /// - `Ok(false)`: The normal case, means that there could be more data. The caller should
    ///                continue calling `next_batch()` although for each call the returned data may
    ///                be empty.
    /// - `Ok(true)`:  Means that the executor is drained and no more data will be returned in
    ///                future. However there could be some (last) data in the `data` field this
    ///                time. The caller should NOT call `next_batch()` any more.
    /// - `Err(_)`:    Means that there is an error when trying to retrieve more data. In this case,
    ///                the error is returned and the executor is also drained. Similar to
    ///                `Ok(true)`, there could be some remaining data in the `data` field which is
    ///                valid data and should be processed. The caller should NOT call `next_batch()`
    ///                any more.
    pub is_drained: Result<bool>,
}