use tipb::FieldType;
use super::expr::{RpnExpression, RpnExpressionNode};
use super::RpnFnCallExtra;
use tidb_query_common::Result;
use tidb_query_datatype::codec::batch::LazyBatchColumnVec;
pub use tidb_query_datatype::codec::data_type::{
LogicalRows, BATCH_MAX_SIZE, IDENTICAL_LOGICAL_ROWS,
};
use tidb_query_datatype::codec::data_type::{ScalarValue, ScalarValueRef, VectorValue};
use tidb_query_datatype::expr::EvalContext;
#[derive(Debug)]
pub enum RpnStackNodeVectorValue<'a> {
Generated {
physical_value: VectorValue,
},
Ref {
physical_value: &'a VectorValue,
logical_rows: &'a [usize],
},
}
impl<'a> RpnStackNodeVectorValue<'a> {
pub fn as_ref(&self) -> &VectorValue {
match self {
RpnStackNodeVectorValue::Generated { physical_value, .. } => &physical_value,
RpnStackNodeVectorValue::Ref { physical_value, .. } => *physical_value,
}
}
pub fn logical_rows_struct(&self) -> LogicalRows {
match self {
RpnStackNodeVectorValue::Generated { physical_value } => LogicalRows::Ref {
logical_rows: &IDENTICAL_LOGICAL_ROWS[0..physical_value.len()],
},
RpnStackNodeVectorValue::Ref { logical_rows, .. } => LogicalRows::Ref { logical_rows },
}
}
pub fn logical_rows(&self) -> &[usize] {
self.logical_rows_struct().as_slice()
}
}
#[derive(Debug)]
pub enum RpnStackNode<'a> {
Scalar {
value: &'a ScalarValue,
field_type: &'a FieldType,
},
Vector {
value: RpnStackNodeVectorValue<'a>,
field_type: &'a FieldType,
},
}
impl<'a> RpnStackNode<'a> {
#[inline]
pub fn field_type(&self) -> &FieldType {
match self {
RpnStackNode::Scalar { field_type, .. } => field_type,
RpnStackNode::Vector { field_type, .. } => field_type,
}
}
#[inline]
pub fn scalar_value(&self) -> Option<&ScalarValue> {
match self {
RpnStackNode::Scalar { value, .. } => Some(*value),
RpnStackNode::Vector { .. } => None,
}
}
#[inline]
pub fn vector_value(&self) -> Option<&RpnStackNodeVectorValue<'_>> {
match self {
RpnStackNode::Scalar { .. } => None,
RpnStackNode::Vector { value, .. } => Some(&value),
}
}
#[inline]
pub fn is_scalar(&self) -> bool {
matches!(self, RpnStackNode::Scalar { .. })
}
#[inline]
pub fn is_vector(&self) -> bool {
matches!(self, RpnStackNode::Vector { .. })
}
#[inline]
pub fn get_logical_scalar_ref(&self, logical_index: usize) -> ScalarValueRef<'_> {
match self {
RpnStackNode::Vector { value, .. } => {
let physical_vector = value.as_ref();
let logical_rows = value.logical_rows_struct();
let idx = logical_rows.get_idx(logical_index);
physical_vector.get_scalar_ref(idx)
}
RpnStackNode::Scalar { value, .. } => value.as_scalar_value_ref(),
}
}
}
impl RpnExpression {
pub fn eval<'a>(
&'a self,
ctx: &mut EvalContext,
schema: &'a [FieldType],
input_physical_columns: &'a mut LazyBatchColumnVec,
input_logical_rows: &'a [usize],
output_rows: usize,
) -> Result<RpnStackNode<'a>> {
self.ensure_columns_decoded(ctx, schema, input_physical_columns, input_logical_rows)?;
self.eval_decoded(
ctx,
schema,
input_physical_columns,
input_logical_rows,
output_rows,
)
}
pub fn ensure_columns_decoded<'a>(
&'a self,
ctx: &mut EvalContext,
schema: &'a [FieldType],
input_physical_columns: &'a mut LazyBatchColumnVec,
input_logical_rows: &[usize],
) -> Result<()> {
for node in self.as_ref() {
if let RpnExpressionNode::ColumnRef { offset, .. } = node {
input_physical_columns[*offset].ensure_decoded(
ctx,
&schema[*offset],
LogicalRows::from_slice(input_logical_rows),
)?;
}
}
Ok(())
}
pub fn eval_decoded<'a>(
&'a self,
ctx: &mut EvalContext,
schema: &'a [FieldType],
input_physical_columns: &'a LazyBatchColumnVec,
input_logical_rows: &'a [usize],
output_rows: usize,
) -> Result<RpnStackNode<'a>> {
assert!(output_rows > 0);
assert!(output_rows <= BATCH_MAX_SIZE);
let mut stack = Vec::with_capacity(self.len());
for node in self.as_ref() {
match node {
RpnExpressionNode::Constant { value, field_type } => {
stack.push(RpnStackNode::Scalar { value, field_type });
}
RpnExpressionNode::ColumnRef { offset } => {
let field_type = &schema[*offset];
let decoded_physical_column = input_physical_columns[*offset].decoded();
assert_eq!(input_logical_rows.len(), output_rows);
stack.push(RpnStackNode::Vector {
value: RpnStackNodeVectorValue::Ref {
physical_value: &decoded_physical_column,
logical_rows: input_logical_rows,
},
field_type,
});
}
RpnExpressionNode::FnCall {
func_meta,
args_len,
field_type: ret_field_type,
metadata,
} => {
assert!(stack.len() >= *args_len);
let stack_slice_begin = stack.len() - *args_len;
let stack_slice = &stack[stack_slice_begin..];
let mut call_extra = RpnFnCallExtra { ret_field_type };
let ret = (func_meta.fn_ptr)(
ctx,
output_rows,
stack_slice,
&mut call_extra,
&**metadata,
)?;
stack.truncate(stack_slice_begin);
stack.push(RpnStackNode::Vector {
value: RpnStackNodeVectorValue::Generated {
physical_value: ret,
},
field_type: ret_field_type,
});
}
}
}
assert_eq!(stack.len(), 1);
Ok(stack.into_iter().next().unwrap())
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::float_cmp)]
use super::*;
use tidb_query_codegen::rpn_fn;
use tidb_query_datatype::{EvalType, FieldTypeAccessor, FieldTypeTp};
use tipb::FieldType;
use tipb_helper::ExprDefBuilder;
use crate::impl_arithmetic::*;
use crate::impl_compare::*;
use crate::{RpnExpressionBuilder, RpnFnMeta};
use test::{black_box, Bencher};
use tidb_query_common::Result;
use tidb_query_datatype::codec::batch::LazyBatchColumn;
use tidb_query_datatype::codec::data_type::*;
use tidb_query_datatype::codec::datum::{Datum, DatumEncoder};
use tidb_query_datatype::expr::EvalContext;
#[test]
fn test_eval_single_constant_node() {
let exp = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(1.5f64)
.build_for_test();
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::empty();
let result = exp.eval(&mut ctx, &[], &mut columns, &[], 10);
let val = result.unwrap();
assert!(val.is_scalar());
assert_eq!(
val.scalar_value().unwrap().as_real(),
Real::new(1.5).ok().as_ref()
);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::Double);
}
fn new_single_column_node_fixture() -> (LazyBatchColumnVec, Vec<usize>, [FieldType; 2]) {
let physical_columns = LazyBatchColumnVec::from(vec![
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(5, EvalType::Real);
col.mut_decoded().push_real(Real::new(1.0).ok());
col.mut_decoded().push_real(None);
col.mut_decoded().push_real(Real::new(7.5).ok());
col.mut_decoded().push_real(None);
col.mut_decoded().push_real(None);
col
},
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(5, EvalType::Int);
col.mut_decoded().push_int(Some(1));
col.mut_decoded().push_int(Some(5));
col.mut_decoded().push_int(None);
col.mut_decoded().push_int(None);
col.mut_decoded().push_int(Some(42));
col
},
]);
let schema = [FieldTypeTp::Double.into(), FieldTypeTp::LongLong.into()];
let logical_rows = (0..5).collect();
(physical_columns, logical_rows, schema)
}
#[test]
fn test_eval_single_column_node_normal() {
let (columns, logical_rows, schema) = new_single_column_node_fixture();
let mut c = columns.clone();
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, &schema, &mut c, &logical_rows, 5);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[Some(1), Some(5), None, None, Some(42)]
);
assert_eq!(
val.vector_value().unwrap().logical_rows(),
logical_rows.as_slice()
);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
let mut c = columns.clone();
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, &schema, &mut c, &[2, 0, 1], 3);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[Some(1), Some(5), None, None, Some(42)]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[2, 0, 1]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
let mut c = columns;
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, &schema, &mut c, &logical_rows, 5);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_real_vec(),
[Real::new(1.0).ok(), None, Real::new(7.5).ok(), None, None]
);
assert_eq!(
val.vector_value().unwrap().logical_rows(),
logical_rows.as_slice()
);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::Double);
}
#[test]
fn test_eval_single_column_node_mismatch_rows() {
let (columns, logical_rows, schema) = new_single_column_node_fixture();
let mut c = columns.clone();
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.build_for_test();
let mut ctx = EvalContext::default();
let hooked_eval = panic_hook::recover_safe(|| {
let _ = exp.eval(&mut ctx, &schema, &mut c, &logical_rows, 4);
});
assert!(hooked_eval.is_err());
let mut c = columns;
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.build_for_test();
let mut ctx = EvalContext::default();
let hooked_eval = panic_hook::recover_safe(|| {
let _ = exp.eval(&mut ctx, &schema, &mut c, &logical_rows, 6);
});
assert!(hooked_eval.is_err());
}
#[test]
fn test_eval_single_fn_call_node() {
#[rpn_fn(nullable)]
fn foo() -> Result<Option<i64>> {
Ok(Some(42))
}
let exp = RpnExpressionBuilder::new_for_test()
.push_fn_call_for_test(foo_fn_meta(), 0, FieldTypeTp::LongLong)
.build_for_test();
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::empty();
let result = exp.eval(&mut ctx, &[], &mut columns, &[], 4);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[Some(42), Some(42), Some(42), Some(42)]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1, 2, 3]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
}
#[test]
fn test_eval_unary_function_scalar() {
#[rpn_fn(nullable)]
fn foo(v: Option<&Real>) -> Result<Option<Real>> {
Ok(v.map(|v| *v * 2.0))
}
let exp = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(1.5f64)
.push_fn_call_for_test(foo_fn_meta(), 1, FieldTypeTp::Double)
.build_for_test();
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::empty();
let result = exp.eval(&mut ctx, &[], &mut columns, &[], 3);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_real_vec(),
[
Real::new(3.0).ok(),
Real::new(3.0).ok(),
Real::new(3.0).ok()
]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1, 2]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::Double);
}
#[test]
fn test_eval_unary_function_vector() {
#[rpn_fn(nullable)]
fn foo(v: Option<&i64>) -> Result<Option<i64>> {
Ok(v.map(|v| v + 5))
}
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Int);
col.mut_decoded().push_int(Some(1));
col.mut_decoded().push_int(Some(5));
col.mut_decoded().push_int(None);
col
}]);
let schema = &[FieldTypeTp::LongLong.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_fn_call_for_test(foo_fn_meta(), 1, FieldTypeTp::LongLong)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[2, 0], 2);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[None, Some(6)]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
}
#[test]
fn test_eval_unary_function_raw_column() {
#[rpn_fn(nullable)]
fn foo(v: Option<&i64>) -> Result<Option<i64>> {
Ok(Some(v.unwrap() + 5))
}
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::raw_with_capacity(3);
let mut datum_raw = Vec::new();
datum_raw
.write_datum(&mut ctx, &[Datum::I64(-5)], false)
.unwrap();
col.mut_raw().push(&datum_raw);
let mut datum_raw = Vec::new();
datum_raw
.write_datum(&mut ctx, &[Datum::I64(-7)], false)
.unwrap();
col.mut_raw().push(&datum_raw);
let mut datum_raw = Vec::new();
datum_raw
.write_datum(&mut ctx, &[Datum::I64(3)], false)
.unwrap();
col.mut_raw().push(&datum_raw);
col
}]);
let schema = &[FieldTypeTp::LongLong.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_fn_call_for_test(foo_fn_meta(), 1, FieldTypeTp::LongLong)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[2, 0, 1], 3);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[Some(8), Some(0), Some(-2)]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1, 2]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
}
#[test]
fn test_eval_binary_function_scalar_scalar() {
#[rpn_fn(nullable)]
fn foo(v1: Option<&Real>, v2: Option<&i64>) -> Result<Option<Real>> {
Ok(Some(*v1.unwrap() + *v2.unwrap() as f64 - 1.0))
}
let exp = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(1.5f64)
.push_constant_for_test(3i64)
.push_fn_call_for_test(foo_fn_meta(), 2, FieldTypeTp::Double)
.build_for_test();
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::empty();
let result = exp.eval(&mut ctx, &[], &mut columns, &[], 3);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_real_vec(),
[
Real::new(3.5).ok(),
Real::new(3.5).ok(),
Real::new(3.5).ok()
]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1, 2]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::Double);
}
#[test]
fn test_eval_binary_function_vector_scalar() {
#[rpn_fn(nullable)]
fn foo(v1: Option<&Real>, v2: Option<&Real>) -> Result<Option<Real>> {
Ok(Some(*v1.unwrap() - *v2.unwrap()))
}
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Real);
col.mut_decoded().push_real(Real::new(1.0).ok());
col.mut_decoded().push_real(Real::new(5.5).ok());
col.mut_decoded().push_real(Real::new(-4.3).ok());
col
}]);
let schema = &[FieldTypeTp::Double.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_constant_for_test(1.5f64)
.push_fn_call_for_test(foo_fn_meta(), 2, FieldTypeTp::Double)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[2, 0], 2);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_real_vec(),
[
Real::new(-5.8).ok(),
Real::new(-0.5).ok(),
]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::Double);
}
#[test]
fn test_eval_binary_function_scalar_vector() {
#[rpn_fn(nullable)]
fn foo(v1: Option<&Real>, v2: Option<&i64>) -> Result<Option<Real>> {
Ok(Some(*v1.unwrap() - *v2.unwrap() as f64))
}
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Int);
col.mut_decoded().push_int(Some(1));
col.mut_decoded().push_int(Some(5));
col.mut_decoded().push_int(Some(-4));
col
}]);
let schema = &[FieldTypeTp::LongLong.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(1.5f64)
.push_column_ref_for_test(0)
.push_fn_call_for_test(foo_fn_meta(), 2, FieldTypeTp::Double)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[1, 2], 2);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_real_vec(),
[
Real::new(-3.5).ok(),
Real::new(5.5).ok(),
]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::Double);
}
#[test]
fn test_eval_binary_function_vector_vector() {
#[rpn_fn(nullable)]
fn foo(v1: Option<&Real>, v2: Option<&i64>) -> Result<Option<i64>> {
Ok(Some(
(v1.unwrap().into_inner() * 2.5 - (*v2.unwrap() as f64) * 3.5) as i64,
))
}
let mut columns = LazyBatchColumnVec::from(vec![
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Int);
col.mut_decoded().push_int(Some(1));
col.mut_decoded().push_int(Some(5));
col.mut_decoded().push_int(Some(-4));
col
},
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Real);
col.mut_decoded().push_real(Real::new(0.5).ok());
col.mut_decoded().push_real(Real::new(-0.1).ok());
col.mut_decoded().push_real(Real::new(3.5).ok());
col
},
]);
let schema = &[FieldTypeTp::LongLong.into(), FieldTypeTp::Double.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.push_column_ref_for_test(0)
.push_fn_call_for_test(foo_fn_meta(), 2, FieldTypeTp::LongLong)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[0, 2, 1], 3);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[
Some(-2),
Some(22),
Some(-17),
]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1, 2]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
}
#[test]
fn test_eval_binary_function_raw_column() {
#[rpn_fn(nullable)]
fn foo(v1: Option<&i64>, v2: Option<&i64>) -> Result<Option<i64>> {
Ok(Some(v1.unwrap() * v2.unwrap()))
}
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::raw_with_capacity(3);
let mut datum_raw = Vec::new();
datum_raw
.write_datum(&mut ctx, &[Datum::I64(-5)], false)
.unwrap();
col.mut_raw().push(&datum_raw);
let mut datum_raw = Vec::new();
datum_raw
.write_datum(&mut ctx, &[Datum::I64(-7)], false)
.unwrap();
col.mut_raw().push(&datum_raw);
let mut datum_raw = Vec::new();
datum_raw
.write_datum(&mut ctx, &[Datum::I64(3)], false)
.unwrap();
col.mut_raw().push(&datum_raw);
col
}]);
let schema = &[FieldTypeTp::LongLong.into(), FieldTypeTp::LongLong.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_column_ref_for_test(0)
.push_fn_call_for_test(foo_fn_meta(), 2, FieldTypeTp::LongLong)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[1], 1);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[Some(49)]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
}
#[test]
fn test_eval_ternary_function() {
#[rpn_fn(nullable)]
fn foo(v1: Option<&i64>, v2: Option<&i64>, v3: Option<&i64>) -> Result<Option<i64>> {
Ok(Some(v1.unwrap() - v2.unwrap() * v3.unwrap()))
}
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Int);
col.mut_decoded().push_int(Some(1));
col.mut_decoded().push_int(Some(5));
col.mut_decoded().push_int(Some(-4));
col
}]);
let schema = &[FieldTypeTp::LongLong.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_constant_for_test(3i64)
.push_column_ref_for_test(0)
.push_fn_call_for_test(foo_fn_meta(), 3, FieldTypeTp::LongLong)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[1, 0, 2], 3);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[Some(-10), Some(-2), Some(8)]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1, 2]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
}
#[test]
fn test_eval_comprehensive() {
#[rpn_fn(nullable)]
fn fn_a(v1: Option<&Real>, v2: Option<&Real>, v3: Option<&Real>) -> Result<Option<Real>> {
Ok(Some(*v1.unwrap() * *v2.unwrap() - *v3.unwrap()))
}
#[rpn_fn(nullable)]
fn fn_b() -> Result<Option<Real>> {
Ok(Real::new(42.0).ok())
}
#[rpn_fn(nullable)]
fn fn_c(v1: Option<&i64>, v2: Option<&i64>) -> Result<Option<Real>> {
Ok(Real::new((v2.unwrap() - v1.unwrap()) as f64).ok())
}
#[rpn_fn(nullable)]
fn fn_d(v1: Option<&i64>, v2: Option<&i64>) -> Result<Option<i64>> {
Ok(Some(v1.unwrap() + v2.unwrap() * 2))
}
let mut columns = LazyBatchColumnVec::from(vec![
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Real);
col.mut_decoded().push_real(Real::new(0.5).ok());
col.mut_decoded().push_real(Real::new(-0.1).ok());
col.mut_decoded().push_real(Real::new(3.5).ok());
col
},
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Int);
col.mut_decoded().push_int(Some(1));
col.mut_decoded().push_int(Some(5));
col.mut_decoded().push_int(Some(-4));
col
},
]);
let schema = &[FieldTypeTp::Double.into(), FieldTypeTp::LongLong.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_fn_call_for_test(fn_b_fn_meta(), 0, FieldTypeTp::Double)
.push_column_ref_for_test(1)
.push_constant_for_test(7i64)
.push_fn_call_for_test(fn_d_fn_meta(), 2, FieldTypeTp::LongLong)
.push_constant_for_test(11i64)
.push_fn_call_for_test(fn_c_fn_meta(), 2, FieldTypeTp::Double)
.push_fn_call_for_test(fn_a_fn_meta(), 3, FieldTypeTp::Double)
.build_for_test();
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[2, 0], 2);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_real_vec(),
[Real::new(146.0).ok(), Real::new(25.0).ok(),]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::Double);
}
#[test]
fn test_eval_fail_1() {
#[rpn_fn(nullable)]
fn foo(_v: Option<&i64>) -> Result<Option<i64>> {
unreachable!()
}
let exp = RpnExpressionBuilder::new_for_test()
.push_fn_call_for_test(foo_fn_meta(), 1, FieldTypeTp::LongLong)
.build_for_test();
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::empty();
let hooked_eval = panic_hook::recover_safe(|| {
let _ = exp.eval(&mut ctx, &[], &mut columns, &[], 3);
});
assert!(hooked_eval.is_err());
}
#[test]
fn test_eval_fail_2() {
#[rpn_fn(nullable)]
fn foo(v: Option<&Real>) -> Result<Option<Real>> {
Ok(v.map(|v| *v * 2.0))
}
let exp = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(3.0f64)
.push_constant_for_test(1.5f64)
.push_fn_call_for_test(foo_fn_meta(), 1, FieldTypeTp::Double)
.build_for_test();
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::empty();
let hooked_eval = panic_hook::recover_safe(|| {
let _ = exp.eval(&mut ctx, &[], &mut columns, &[], 3);
});
assert!(hooked_eval.is_err());
}
#[test]
fn test_eval_fail_3() {
#[rpn_fn(nullable)]
fn foo(v: Option<&Real>) -> Result<Option<Real>> {
Ok(v.map(|v| *v * 2.5))
}
let exp = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(7i64)
.push_fn_call_for_test(foo_fn_meta(), 1, FieldTypeTp::Double)
.build_for_test();
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::empty();
let hooked_eval = panic_hook::recover_safe(|| {
let _ = exp.eval(&mut ctx, &[], &mut columns, &[], 3);
});
assert!(hooked_eval.is_err());
}
#[test]
fn test_parse_and_eval() {
use tipb::{Expr, ScalarFuncSig};
#[rpn_fn(nullable)]
fn fn_a(a: Option<&i64>, b: Option<&Real>, c: Option<&i64>) -> Result<Option<Real>> {
Ok(Real::new(*a.unwrap() as f64 - b.unwrap().into_inner() * *c.unwrap() as f64).ok())
}
#[rpn_fn(nullable)]
fn fn_b(a: Option<&Real>, b: Option<&i64>) -> Result<Option<Real>> {
Ok(Real::new(a.unwrap().into_inner() * (*b.unwrap() as f64 - 1.5)).ok())
}
#[rpn_fn(nullable)]
fn fn_c() -> Result<Option<i64>> {
Ok(Some(42))
}
#[rpn_fn(nullable)]
fn fn_d(a: Option<&Real>) -> Result<Option<i64>> {
Ok(Some(a.unwrap().into_inner() as i64))
}
fn fn_mapper(expr: &Expr) -> Result<RpnFnMeta> {
Ok(match expr.get_sig() {
ScalarFuncSig::CastIntAsInt => fn_a_fn_meta(),
ScalarFuncSig::CastIntAsReal => fn_b_fn_meta(),
ScalarFuncSig::CastIntAsString => fn_c_fn_meta(),
ScalarFuncSig::CastIntAsDecimal => fn_d_fn_meta(),
_ => unreachable!(),
})
}
let node =
ExprDefBuilder::scalar_func(ScalarFuncSig::CastIntAsDecimal, FieldTypeTp::LongLong)
.push_child(
ExprDefBuilder::scalar_func(ScalarFuncSig::CastIntAsInt, FieldTypeTp::Double)
.push_child(ExprDefBuilder::constant_int(7))
.push_child(
ExprDefBuilder::scalar_func(
ScalarFuncSig::CastIntAsReal,
FieldTypeTp::Double,
)
.push_child(ExprDefBuilder::column_ref(1, FieldTypeTp::Double))
.push_child(ExprDefBuilder::scalar_func(
ScalarFuncSig::CastIntAsString,
FieldTypeTp::LongLong,
)),
)
.push_child(ExprDefBuilder::column_ref(0, FieldTypeTp::LongLong)),
)
.build();
let exp =
RpnExpressionBuilder::build_from_expr_tree_with_fn_mapper(node, fn_mapper, 2).unwrap();
let mut columns = LazyBatchColumnVec::from(vec![
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Int);
col.mut_decoded().push_int(Some(1));
col.mut_decoded().push_int(Some(5));
col.mut_decoded().push_int(Some(-4));
col
},
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(3, EvalType::Real);
col.mut_decoded().push_real(Real::new(0.5).ok());
col.mut_decoded().push_real(Real::new(-0.1).ok());
col.mut_decoded().push_real(Real::new(3.5).ok());
col
},
]);
let schema = &[FieldTypeTp::LongLong.into(), FieldTypeTp::Double.into()];
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[2, 0], 2);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[Some(574), Some(-13)]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
}
#[test]
fn test_rpn_fn_data() {
use tidb_query_datatype::codec::data_type::Evaluable;
use tipb::{Expr, ScalarFuncSig};
#[allow(clippy::trivially_copy_pass_by_ref)]
#[rpn_fn(capture = [metadata], metadata_mapper = prepare_a::<T>)]
fn fn_a_nonnull<T: Evaluable + EvaluableRet>(
metadata: &i64,
v: &Int,
) -> Result<Option<Int>> {
assert_eq!(*metadata, 42);
Ok(Some(v + *metadata))
}
fn prepare_a<T: Evaluable>(_expr: &mut Expr) -> Result<i64> {
Ok(42)
}
#[allow(clippy::trivially_copy_pass_by_ref, clippy::ptr_arg)]
#[rpn_fn(nullable, varg, capture = [metadata], metadata_mapper = prepare_b::<T>)]
fn fn_b<T: Evaluable + EvaluableRet>(
metadata: &String,
v: &[Option<&T>],
) -> Result<Option<T>> {
assert_eq!(metadata, &format!("{}", std::mem::size_of::<T>()));
Ok(v[0].cloned())
}
fn prepare_b<T: Evaluable>(_expr: &mut Expr) -> Result<String> {
Ok(format!("{}", std::mem::size_of::<T>()))
}
#[allow(clippy::trivially_copy_pass_by_ref)]
#[rpn_fn(nullable, raw_varg, capture = [metadata], metadata_mapper = prepare_c::<T>)]
fn fn_c<T: Evaluable>(
_data: &std::marker::PhantomData<T>,
args: &[ScalarValueRef<'_>],
) -> Result<Option<Int>> {
Ok(Some(args.len() as i64))
}
fn prepare_c<T: Evaluable>(_expr: &mut Expr) -> Result<std::marker::PhantomData<T>> {
Ok(std::marker::PhantomData)
}
fn fn_mapper(expr: &Expr) -> Result<RpnFnMeta> {
Ok(match expr.get_sig() {
ScalarFuncSig::CastIntAsInt => fn_a_nonnull_fn_meta::<Real>(),
ScalarFuncSig::CastIntAsReal => fn_b_fn_meta::<Real>(),
ScalarFuncSig::CastIntAsString => fn_c_fn_meta::<Int>(),
_ => unreachable!(),
})
}
let node =
ExprDefBuilder::scalar_func(ScalarFuncSig::CastIntAsString, FieldTypeTp::LongLong)
.push_child(
ExprDefBuilder::scalar_func(ScalarFuncSig::CastIntAsReal, FieldTypeTp::Double)
.push_child(ExprDefBuilder::constant_real(0.5)),
)
.push_child(
ExprDefBuilder::scalar_func(ScalarFuncSig::CastIntAsInt, FieldTypeTp::LongLong)
.push_child(ExprDefBuilder::column_ref(0, FieldTypeTp::LongLong)),
)
.build();
let exp =
RpnExpressionBuilder::build_from_expr_tree_with_fn_mapper(node, fn_mapper, 1).unwrap();
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(2, EvalType::Int);
col.mut_decoded().push_int(Some(1));
col.mut_decoded().push_int(None);
col
}]);
let schema = &[FieldTypeTp::LongLong.into(), FieldTypeTp::Double.into()];
let mut ctx = EvalContext::default();
let result = exp.eval(&mut ctx, schema, &mut columns, &[1, 0], 2);
let val = result.unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_int_vec(),
[Some(2), Some(2)]
);
assert_eq!(val.vector_value().unwrap().logical_rows(), &[0, 1]);
assert_eq!(val.field_type().as_accessor().tp(), FieldTypeTp::LongLong);
}
#[test]
fn test_merge_nulls_constant_null() {
#[rpn_fn]
fn foo(v: &Real) -> Result<Option<Real>> {
Ok(Some(*v * 2.5))
}
let exp = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(ScalarValue::Real(None))
.push_fn_call_for_test(foo_fn_meta(), 1, FieldTypeTp::Double)
.build_for_test();
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::empty();
let val = exp.eval(&mut ctx, &[], &mut columns, &[], 10).unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_real_vec(),
(0..10).map(|_| None).collect::<Vec<Option<Real>>>()
);
}
#[test]
fn test_merge_nulls_constant() {
#[rpn_fn]
fn foo(v: &Real) -> Result<Option<Real>> {
Ok(Some(*v * 2.5))
}
let exp = RpnExpressionBuilder::new_for_test()
.push_constant_for_test(ScalarValue::Real(Real::new(10.0).ok()))
.push_fn_call_for_test(foo_fn_meta(), 1, FieldTypeTp::Double)
.build_for_test();
let mut ctx = EvalContext::default();
let mut columns = LazyBatchColumnVec::empty();
let val = exp.eval(&mut ctx, &[], &mut columns, &[], 10).unwrap();
assert!(val.is_vector());
assert_eq!(
val.vector_value().unwrap().as_ref().to_real_vec(),
(0..10)
.map(|_| Real::new(25.0).ok())
.collect::<Vec<Option<Real>>>()
);
}
#[bench]
fn bench_eval_plus_1024_rows(b: &mut Bencher) {
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(1024, EvalType::Int);
for i in 0..1024 {
col.mut_decoded().push_int(Some(i));
}
col
}]);
let schema = &[FieldTypeTp::LongLong.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_column_ref_for_test(0)
.push_fn_call_for_test(arithmetic_fn_meta::<IntIntPlus>(), 2, FieldTypeTp::LongLong)
.build_for_test();
let mut ctx = EvalContext::default();
let logical_rows: Vec<_> = (0..1024).collect();
profiler::start("./bench_eval_plus_1024_rows.profile");
b.iter(|| {
let result = black_box(&exp).eval(
black_box(&mut ctx),
black_box(schema),
black_box(&mut columns),
black_box(&logical_rows),
black_box(1024),
);
assert!(result.is_ok());
});
profiler::stop();
}
#[bench]
fn bench_eval_compare_1024_rows(b: &mut Bencher) {
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(1024, EvalType::Int);
for i in 0..1024 {
col.mut_decoded().push_int(Some(i));
}
col
}]);
let schema = &[FieldTypeTp::LongLong.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_column_ref_for_test(0)
.push_fn_call_for_test(
compare_fn_meta::<BasicComparer<Int, CmpOpLE>>(),
2,
FieldTypeTp::LongLong,
)
.build_for_test();
let mut ctx = EvalContext::default();
let logical_rows: Vec<_> = (0..1024).collect();
profiler::start("./eval_compare_1024_rows.profile");
b.iter(|| {
let result = black_box(&exp).eval(
black_box(&mut ctx),
black_box(schema),
black_box(&mut columns),
black_box(&logical_rows),
black_box(1024),
);
assert!(result.is_ok());
});
profiler::stop();
}
#[bench]
fn bench_eval_compare_5_rows(b: &mut Bencher) {
let mut columns = LazyBatchColumnVec::from(vec![{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(5, EvalType::Int);
for i in 0..5 {
col.mut_decoded().push_int(Some(i));
}
col
}]);
let schema = &[FieldTypeTp::LongLong.into()];
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_column_ref_for_test(0)
.push_fn_call_for_test(
compare_fn_meta::<BasicComparer<Int, CmpOpLE>>(),
2,
FieldTypeTp::LongLong,
)
.build_for_test();
let mut ctx = EvalContext::default();
let logical_rows: Vec<_> = (0..5).collect();
profiler::start("./bench_eval_compare_5_rows.profile");
b.iter(|| {
let result = black_box(&exp).eval(
black_box(&mut ctx),
black_box(schema),
black_box(&mut columns),
black_box(&logical_rows),
black_box(5),
);
assert!(result.is_ok());
});
profiler::stop();
}
}
#[cfg(test)]
mod benches {
use super::*;
use crate::RpnExpressionBuilder;
use tidb_query_codegen::rpn_fn;
use tidb_query_common::Result;
use tidb_query_datatype::codec::batch::LazyBatchColumn;
use tidb_query_datatype::codec::data_type::*;
use tidb_query_datatype::expr::EvalContext;
use tidb_query_datatype::{EvalType, FieldTypeTp};
#[bench]
fn bench_int_eval(b: &mut test::Bencher) {
#[rpn_fn]
fn foo(u: &Real, v: &Real, w: &Real) -> Result<Option<Real>> {
Ok(Some(*u * 2.5 + *v * 2.5 + *w * 2.5))
}
let mut columns = LazyBatchColumnVec::from(vec![
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(1024, EvalType::Real);
for _i in 0..256 {
col.mut_decoded().push_real(Real::new(233.0).ok());
col.mut_decoded().push_real(Real::new(233.0).ok());
col.mut_decoded().push_real(Real::new(233.0).ok());
col.mut_decoded().push_real(Real::new(233.0).ok());
}
col
},
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(1024, EvalType::Real);
for _i in 0..256 {
col.mut_decoded().push_real(Real::new(233.0).ok());
col.mut_decoded().push_real(Real::new(233.0).ok());
col.mut_decoded().push_real(Real::new(233.0).ok());
col.mut_decoded().push_real(None);
}
col
},
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(1024, EvalType::Real);
for _i in 0..256 {
col.mut_decoded().push_real(Real::new(233.0).ok());
col.mut_decoded().push_real(None);
col.mut_decoded().push_real(Real::new(233.0).ok());
col.mut_decoded().push_real(None);
}
col
},
]);
let input_logical_rows: Vec<usize> = (0..1024).collect();
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_column_ref_for_test(1)
.push_column_ref_for_test(2)
.push_fn_call_for_test(foo_fn_meta(), 3, FieldTypeTp::Double)
.build_for_test();
let schema = &[
FieldTypeTp::Double.into(),
FieldTypeTp::Double.into(),
FieldTypeTp::Double.into(),
];
b.iter(|| {
let mut ctx = EvalContext::default();
exp.eval(
&mut ctx,
schema,
&mut columns,
input_logical_rows.as_slice(),
input_logical_rows.len(),
)
.unwrap();
});
}
#[bench]
fn bench_bytes_eval(b: &mut test::Bencher) {
#[rpn_fn(writer)]
fn foo(u: BytesRef, v: BytesRef, w: BytesRef, writer: BytesWriter) -> Result<BytesGuard> {
let mut partial = writer.begin();
partial.partial_write(u);
partial.partial_write(v);
partial.partial_write(w);
Ok(partial.finish())
}
let mut bytes_vec: Vec<u8> = vec![];
for _i in 0..10 {
bytes_vec.append(&mut b"2333333333".to_vec());
}
let mut columns = LazyBatchColumnVec::from(vec![
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(1024, EvalType::Bytes);
for _i in 0..256 {
col.mut_decoded().push_bytes(Some(bytes_vec.clone()));
col.mut_decoded().push_bytes(Some(bytes_vec.clone()));
col.mut_decoded().push_bytes(Some(bytes_vec.clone()));
col.mut_decoded().push_bytes(Some(bytes_vec.clone()));
}
col
},
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(1024, EvalType::Bytes);
for _i in 0..256 {
col.mut_decoded().push_bytes(Some(bytes_vec.clone()));
col.mut_decoded().push_bytes(None);
col.mut_decoded().push_bytes(Some(bytes_vec.clone()));
col.mut_decoded().push_bytes(Some(bytes_vec.clone()));
}
col
},
{
let mut col = LazyBatchColumn::decoded_with_capacity_and_tp(1024, EvalType::Bytes);
for _i in 0..256 {
col.mut_decoded().push_bytes(Some(bytes_vec.clone()));
col.mut_decoded().push_bytes(None);
col.mut_decoded().push_bytes(Some(bytes_vec.clone()));
col.mut_decoded().push_bytes(None);
}
col
},
]);
let input_logical_rows: Vec<usize> = (0..1024).collect();
let exp = RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_column_ref_for_test(1)
.push_column_ref_for_test(2)
.push_fn_call_for_test(foo_fn_meta(), 3, FieldTypeTp::String)
.build_for_test();
let schema = &[
FieldTypeTp::String.into(),
FieldTypeTp::String.into(),
FieldTypeTp::String.into(),
];
b.iter(|| {
let mut ctx = EvalContext::default();
exp.eval(
&mut ctx,
schema,
&mut columns,
input_logical_rows.as_slice(),
input_logical_rows.len(),
)
.unwrap();
});
}
}