use std::sync::Arc;
use tipb::Expr;
use tipb::FieldType;
use tipb::Selection;
use crate::interface::*;
use tidb_query_common::storage::IntervalRange;
use tidb_query_common::Result;
use tidb_query_datatype::codec::data_type::*;
use tidb_query_datatype::expr::{EvalConfig, EvalContext};
use tidb_query_datatype::match_template_evaltype;
use tidb_query_expr::RpnStackNode;
use tidb_query_expr::{RpnExpression, RpnExpressionBuilder};
pub struct BatchSelectionExecutor<Src: BatchExecutor> {
context: EvalContext,
src: Src,
conditions: Vec<RpnExpression>,
}
impl BatchSelectionExecutor<Box<dyn BatchExecutor<StorageStats = ()>>> {
#[inline]
pub fn check_supported(descriptor: &Selection) -> Result<()> {
let conditions = descriptor.get_conditions();
for c in conditions {
RpnExpressionBuilder::check_expr_tree_supported(c)?;
}
Ok(())
}
}
impl<Src: BatchExecutor> BatchSelectionExecutor<Src> {
#[cfg(test)]
pub fn new_for_test(src: Src, conditions: Vec<RpnExpression>) -> Self {
Self {
context: EvalContext::default(),
src,
conditions,
}
}
pub fn new(config: Arc<EvalConfig>, src: Src, conditions_def: Vec<Expr>) -> Result<Self> {
let mut conditions = Vec::with_capacity(conditions_def.len());
let mut ctx = EvalContext::new(config);
for def in conditions_def {
conditions.push(RpnExpressionBuilder::build_from_expr_tree(
def,
&mut ctx,
src.schema().len(),
)?);
}
Ok(Self {
context: ctx,
src,
conditions,
})
}
fn handle_src_result(&mut self, src_result: &mut BatchExecuteResult) -> Result<()> {
let mut src_logical_rows_copy = Vec::with_capacity(src_result.logical_rows.len());
let mut condition_index = 0;
while condition_index < self.conditions.len() && !src_result.logical_rows.is_empty() {
src_logical_rows_copy.clear();
src_logical_rows_copy.extend_from_slice(&src_result.logical_rows);
match self.conditions[condition_index].eval(
&mut self.context,
self.src.schema(),
&mut src_result.physical_columns,
&src_logical_rows_copy,
src_logical_rows_copy.len(),
)? {
RpnStackNode::Scalar { value, .. } => {
update_logical_rows_by_scalar_value(
&mut src_result.logical_rows,
&mut self.context,
value,
)?;
}
RpnStackNode::Vector { value, .. } => {
let eval_result_logical_rows = value.logical_rows_struct();
match_template_evaltype! {
TT, match value.as_ref() {
VectorValue::TT(eval_result) => {
update_logical_rows_by_vector_value(
&mut src_result.logical_rows,
&mut self.context,
eval_result,
eval_result_logical_rows,
)?;
},
}
}
}
}
condition_index += 1;
}
Ok(())
}
}
fn update_logical_rows_by_scalar_value(
logical_rows: &mut Vec<usize>,
ctx: &mut EvalContext,
value: &ScalarValue,
) -> Result<()> {
let b = value.as_mysql_bool(ctx)?;
if !b {
logical_rows.clear();
}
Ok(())
}
fn update_logical_rows_by_vector_value<'a, TT: EvaluableRef<'a>, T: 'a + ChunkRef<'a, TT>>(
logical_rows: &mut Vec<usize>,
ctx: &mut EvalContext,
eval_result: T,
eval_result_logical_rows: LogicalRows,
) -> tidb_query_common::error::Result<()>
where
Option<TT>: AsMySQLBool,
{
let mut err_result = Ok(());
let mut logical_index = 0;
logical_rows.retain(|_| {
let eval_result_physical_index = eval_result_logical_rows.get_idx(logical_index);
logical_index += 1;
match eval_result
.get_option_ref(eval_result_physical_index)
.as_mysql_bool(ctx)
{
Err(e) => {
if err_result.is_ok() {
err_result = Err(e.into());
}
false
}
Ok(b) => b,
}
});
err_result
}
impl<Src: BatchExecutor> BatchExecutor for BatchSelectionExecutor<Src> {
type StorageStats = Src::StorageStats;
#[inline]
fn schema(&self) -> &[FieldType] {
self.src.schema()
}
#[inline]
fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult {
let mut src_result = self.src.next_batch(scan_rows);
if let Err(e) = self.handle_src_result(&mut src_result) {
src_result.is_drained = src_result.is_drained.and(Err(e));
src_result.logical_rows.clear();
} else {
src_result.warnings.merge(&mut self.context.warnings);
}
src_result
}
#[inline]
fn collect_exec_stats(&mut self, dest: &mut ExecuteStats) {
self.src.collect_exec_stats(dest);
}
#[inline]
fn collect_storage_stats(&mut self, dest: &mut Self::StorageStats) {
self.src.collect_storage_stats(dest);
}
#[inline]
fn take_scanned_range(&mut self) -> IntervalRange {
self.src.take_scanned_range()
}
#[inline]
fn can_be_cached(&self) -> bool {
self.src.can_be_cached()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tidb_query_codegen::rpn_fn;
use tidb_query_datatype::FieldTypeTp;
use crate::util::mock_executor::MockExecutor;
use tidb_query_datatype::codec::batch::LazyBatchColumnVec;
use tidb_query_datatype::expr::EvalWarnings;
#[test]
fn test_empty_rows() {
#[rpn_fn]
fn foo() -> Result<Option<i64>> {
unreachable!()
}
let src_exec = MockExecutor::new(
vec![FieldTypeTp::LongLong.into(), FieldTypeTp::Double.into()],
vec![
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::empty(),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![None].into()),
VectorValue::Real(vec![None].into()),
]),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::empty(),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(true),
},
],
);
let mut exec = BatchSelectionExecutor::new_for_test(
src_exec,
vec![
RpnExpressionBuilder::new_for_test()
.push_fn_call_for_test(foo_fn_meta(), 0, FieldTypeTp::LongLong)
.build_for_test(),
],
);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(r.is_drained.unwrap());
}
fn make_src_executor_using_fixture_1() -> MockExecutor {
MockExecutor::new(
vec![FieldTypeTp::LongLong.into(), FieldTypeTp::Double.into()],
vec![
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![None, None, Some(1), None, Some(5)].into()),
VectorValue::Real(
vec![Real::new(7.0).ok(), Real::new(-5.0).ok(), None, None, None]
.into(),
),
]),
logical_rows: vec![2, 0],
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![None].into()),
VectorValue::Real(vec![None].into()),
]),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![Some(1), None].into()),
VectorValue::Real(vec![None, None].into()),
]),
logical_rows: vec![1],
warnings: EvalWarnings::default(),
is_drained: Ok(true),
},
],
)
}
#[test]
fn test_no_predicate_or_predicate_always_true() {
let exec_no_predicate =
|src_exec: MockExecutor| BatchSelectionExecutor::new_for_test(src_exec, vec![]);
let exec_predicate_true = |src_exec: MockExecutor| {
let predicate = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(1i64)
.build_for_test();
BatchSelectionExecutor::new_for_test(src_exec, vec![predicate])
};
let executor_builders: Vec<Box<dyn FnOnce(MockExecutor) -> _>> =
vec![Box::new(exec_no_predicate), Box::new(exec_predicate_true)];
for exec_builder in executor_builders {
let src_exec = make_src_executor_using_fixture_1();
let mut exec = exec_builder(src_exec);
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[2, 0]);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[1]);
assert!(r.is_drained.unwrap());
}
}
#[test]
fn test_predicate_always_false() {
let src_exec = make_src_executor_using_fixture_1();
let predicate = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(0i64)
.build_for_test();
let mut exec = BatchSelectionExecutor::new_for_test(src_exec, vec![predicate]);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(r.is_drained.unwrap());
}
#[rpn_fn(nullable)]
fn is_even(v: Option<&i64>) -> Result<Option<i64>> {
let r = match v.cloned() {
None => Some(0),
Some(v) => {
if v % 2 == 0 {
Some(1)
} else {
Some(0)
}
}
};
Ok(r)
}
fn make_src_executor_using_fixture_2() -> MockExecutor {
MockExecutor::new(
vec![
FieldTypeTp::LongLong.into(),
FieldTypeTp::LongLong.into(),
FieldTypeTp::LongLong.into(),
],
vec![
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![Some(2), Some(1), None, Some(4), None].into()),
VectorValue::Int(vec![Some(4), None, Some(2), None, None].into()),
VectorValue::Int(vec![Some(3), Some(-1), Some(4), Some(1), Some(2)].into()),
]),
logical_rows: vec![3, 4, 0, 2],
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::empty(),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![None, Some(1)].into()),
VectorValue::Int(vec![None, Some(-1)].into()),
VectorValue::Int(vec![Some(2), Some(42)].into()),
]),
logical_rows: vec![0],
warnings: EvalWarnings::default(),
is_drained: Ok(true),
},
],
)
}
#[test]
fn test_predicate_1() {
let src_exec = make_src_executor_using_fixture_2();
let predicate = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_fn_call_for_test(is_even_fn_meta(), 1, FieldTypeTp::LongLong)
.build_for_test();
let mut exec = BatchSelectionExecutor::new_for_test(src_exec, vec![predicate]);
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[3, 0]);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(r.is_drained.unwrap());
}
#[test]
fn test_predicate_2() {
let src_exec = make_src_executor_using_fixture_2();
let predicate = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.push_fn_call_for_test(is_even_fn_meta(), 1, FieldTypeTp::LongLong)
.build_for_test();
let mut exec = BatchSelectionExecutor::new_for_test(src_exec, vec![predicate]);
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[0, 2]);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(r.is_drained.unwrap());
}
#[test]
fn test_multiple_predicate_1() {
let predicate: Vec<_> = (0..=1)
.map(|offset| {
move || {
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(offset)
.push_fn_call_for_test(is_even_fn_meta(), 1, FieldTypeTp::LongLong)
.build_for_test()
}
})
.collect();
for predicates in vec![
vec![predicate[0](), predicate[1]()],
vec![predicate[1](), predicate[0]()],
] {
let src_exec = make_src_executor_using_fixture_2();
let mut exec = BatchSelectionExecutor::new_for_test(src_exec, predicates);
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[0]);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(r.is_drained.unwrap());
}
}
#[test]
fn test_multiple_predicate_2() {
let predicate: Vec<_> = (0..=2)
.map(|offset| {
move || {
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(offset)
.push_fn_call_for_test(is_even_fn_meta(), 1, FieldTypeTp::LongLong)
.build_for_test()
}
})
.collect();
for predicates in vec![
vec![predicate[0](), predicate[1](), predicate[2]()],
vec![predicate[1](), predicate[2](), predicate[0]()],
] {
let src_exec = make_src_executor_using_fixture_2();
let mut exec = BatchSelectionExecutor::new_for_test(src_exec, predicates);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(r.is_drained.unwrap());
}
}
#[test]
fn test_predicate_error() {
#[rpn_fn(nullable)]
fn foo(v: Option<&i64>) -> Result<Option<i64>> {
match v.cloned() {
None => Err(other_err!("foo")),
Some(v) => Ok(Some(v)),
}
}
let src_exec = MockExecutor::new(
vec![FieldTypeTp::LongLong.into(), FieldTypeTp::LongLong.into()],
vec![
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![Some(1), Some(4), None, Some(1), Some(2)].into()),
VectorValue::Int(vec![None, Some(4), None, Some(2), None].into()),
]),
logical_rows: vec![1, 3, 4, 0],
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![Some(-5)].into()),
VectorValue::Int(vec![Some(5)].into()),
]),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(true),
},
],
);
let predicates = (0..=1)
.map(|offset| {
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(offset)
.push_fn_call_for_test(foo_fn_meta(), 1, FieldTypeTp::LongLong)
.build_for_test()
})
.collect();
let mut exec = BatchSelectionExecutor::new_for_test(src_exec, predicates);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert!(r.is_drained.is_err());
}
}