use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ptr::NonNull;
use std::sync::Arc;
use tipb::{Expr, FieldType, TopN};
use crate::interface::*;
use crate::util::*;
use tidb_query_common::storage::IntervalRange;
use tidb_query_common::Result;
use tidb_query_datatype::codec::batch::{LazyBatchColumn, LazyBatchColumnVec};
use tidb_query_datatype::codec::data_type::*;
use tidb_query_datatype::expr::EvalWarnings;
use tidb_query_datatype::expr::{EvalConfig, EvalContext};
use tidb_query_expr::RpnStackNode;
use tidb_query_expr::{RpnExpression, RpnExpressionBuilder};
pub struct BatchTopNExecutor<Src: BatchExecutor> {
heap: BinaryHeap<HeapItemUnsafe>,
#[allow(clippy::box_vec)]
eval_columns_buffer_unsafe: Box<Vec<RpnStackNode<'static>>>,
order_exprs: Box<[RpnExpression]>,
order_exprs_field_type: Box<[FieldType]>,
order_is_desc: Box<[bool]>,
n: usize,
context: EvalContext,
src: Src,
is_ended: bool,
}
unsafe impl<Src: BatchExecutor> Send for BatchTopNExecutor<Src> {}
impl BatchTopNExecutor<Box<dyn BatchExecutor<StorageStats = ()>>> {
#[inline]
pub fn check_supported(descriptor: &TopN) -> Result<()> {
if descriptor.get_order_by().is_empty() {
return Err(other_err!("Missing Top N column"));
}
for item in descriptor.get_order_by() {
RpnExpressionBuilder::check_expr_tree_supported(item.get_expr())?;
}
Ok(())
}
}
impl<Src: BatchExecutor> BatchTopNExecutor<Src> {
#[cfg(test)]
pub fn new_for_test(
src: Src,
order_exprs: Vec<RpnExpression>,
order_is_desc: Vec<bool>,
n: usize,
) -> Self {
assert_eq!(order_exprs.len(), order_is_desc.len());
let order_exprs_field_type: Vec<FieldType> = order_exprs
.iter()
.map(|expr| expr.ret_field_type(src.schema()).clone())
.collect();
Self {
heap: BinaryHeap::new(),
eval_columns_buffer_unsafe: Box::new(Vec::new()),
order_exprs: order_exprs.into_boxed_slice(),
order_exprs_field_type: order_exprs_field_type.into_boxed_slice(),
order_is_desc: order_is_desc.into_boxed_slice(),
n,
context: EvalContext::default(),
src,
is_ended: false,
}
}
pub fn new(
config: std::sync::Arc<EvalConfig>,
src: Src,
order_exprs_def: Vec<Expr>,
order_is_desc: Vec<bool>,
n: usize,
) -> Result<Self> {
assert_eq!(order_exprs_def.len(), order_is_desc.len());
let mut order_exprs: Vec<RpnExpression> = Vec::with_capacity(order_exprs_def.len());
let mut ctx = EvalContext::new(config.clone());
for def in order_exprs_def {
order_exprs.push(RpnExpressionBuilder::build_from_expr_tree(
def,
&mut ctx,
src.schema().len(),
)?);
}
let order_exprs_field_type: Vec<FieldType> = order_exprs
.iter()
.map(|expr| expr.ret_field_type(src.schema()).clone())
.collect();
Ok(Self {
heap: BinaryHeap::with_capacity(n.min(1024)),
eval_columns_buffer_unsafe: Box::new(Vec::with_capacity(512)),
order_exprs: order_exprs.into_boxed_slice(),
order_exprs_field_type: order_exprs_field_type.into_boxed_slice(),
order_is_desc: order_is_desc.into_boxed_slice(),
n,
context: EvalContext::new(config),
src,
is_ended: false,
})
}
#[inline]
fn handle_next_batch(&mut self) -> Result<Option<LazyBatchColumnVec>> {
let src_result = self.src.next_batch(crate::runner::BATCH_MAX_SIZE);
self.context.warnings = src_result.warnings;
let src_is_drained = src_result.is_drained?;
if !src_result.logical_rows.is_empty() {
self.process_batch_input(src_result.physical_columns, src_result.logical_rows)?;
}
if src_is_drained {
Ok(Some(self.heap_take_all()))
} else {
Ok(None)
}
}
fn process_batch_input(
&mut self,
mut physical_columns: LazyBatchColumnVec,
logical_rows: Vec<usize>,
) -> Result<()> {
ensure_columns_decoded(
&mut self.context,
&self.order_exprs,
self.src.schema(),
&mut physical_columns,
&logical_rows,
)?;
let pinned_source_data = Arc::new(HeapItemSourceData {
physical_columns,
logical_rows,
});
let eval_offset = self.eval_columns_buffer_unsafe.len();
unsafe {
eval_exprs_decoded_no_lifetime(
&mut self.context,
&self.order_exprs,
self.src.schema(),
&pinned_source_data.physical_columns,
&pinned_source_data.logical_rows,
&mut self.eval_columns_buffer_unsafe,
)?;
}
for logical_row_index in 0..pinned_source_data.logical_rows.len() {
let row = HeapItemUnsafe {
order_is_desc_ptr: (&*self.order_is_desc).into(),
order_exprs_field_type_ptr: (&*self.order_exprs_field_type).into(),
source_data: pinned_source_data.clone(),
eval_columns_buffer_ptr: (&*self.eval_columns_buffer_unsafe).into(),
eval_columns_offset: eval_offset,
logical_row_index,
};
self.heap_add_row(row)?;
}
Ok(())
}
fn heap_add_row(&mut self, row: HeapItemUnsafe) -> Result<()> {
if self.heap.len() < self.n {
row.cmp_sort_key(&row)?;
self.heap.push(row);
} else {
let mut greatest_row = self.heap.peek_mut().unwrap();
if row.cmp_sort_key(&greatest_row)? == Ordering::Less {
*greatest_row = row;
}
}
Ok(())
}
#[allow(clippy::clone_on_copy)]
fn heap_take_all(&mut self) -> LazyBatchColumnVec {
let heap = std::mem::take(&mut self.heap);
let sorted_items = heap.into_sorted_vec();
if sorted_items.is_empty() {
return LazyBatchColumnVec::empty();
}
let mut result = sorted_items[0]
.source_data
.physical_columns
.clone_empty(sorted_items.len());
for (column_index, result_column) in result.as_mut_slice().iter_mut().enumerate() {
match result_column {
LazyBatchColumn::Raw(dest_column) => {
for item in &sorted_items {
let src = item.source_data.physical_columns[column_index].raw();
dest_column
.push(&src[item.source_data.logical_rows[item.logical_row_index]]);
}
}
LazyBatchColumn::Decoded(dest_vector_value) => {
match_template::match_template! {
TT = [
Int,
Real,
Duration,
Decimal,
DateTime,
Bytes => BytesRef,
Json => JsonRef,
Enum => EnumRef,
Set => SetRef,
],
match dest_vector_value {
VectorValue::TT(dest_column) => {
for item in &sorted_items {
let src: &VectorValue = item.source_data.physical_columns[column_index].decoded();
let src_ref = TT::borrow_vector_value(src);
dest_column.push(src_ref.get_option_ref(item.source_data.logical_rows[item.logical_row_index]).map(|x| x.into_owned_value()));
}
},
}
}
}
}
}
result.assert_columns_equal_length();
result
}
}
impl<Src: BatchExecutor> BatchExecutor for BatchTopNExecutor<Src> {
type StorageStats = Src::StorageStats;
#[inline]
fn schema(&self) -> &[FieldType] {
self.src.schema()
}
#[inline]
fn next_batch(&mut self, _scan_rows: usize) -> BatchExecuteResult {
assert!(!self.is_ended);
if self.n == 0 {
self.is_ended = true;
return BatchExecuteResult {
physical_columns: LazyBatchColumnVec::empty(),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(true),
};
}
let result = self.handle_next_batch();
match result {
Err(e) => {
self.is_ended = true;
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::empty(),
logical_rows: Vec::new(),
warnings: self.context.take_warnings(),
is_drained: Err(e),
}
}
Ok(Some(logical_columns)) => {
self.is_ended = true;
let logical_rows = (0..logical_columns.rows_len()).collect();
BatchExecuteResult {
physical_columns: logical_columns,
logical_rows,
warnings: self.context.take_warnings(),
is_drained: Ok(true),
}
}
Ok(None) => BatchExecuteResult {
physical_columns: LazyBatchColumnVec::empty(),
logical_rows: Vec::new(),
warnings: self.context.take_warnings(),
is_drained: Ok(false),
},
}
}
#[inline]
fn collect_exec_stats(&mut self, dest: &mut ExecuteStats) {
self.src.collect_exec_stats(dest);
}
#[inline]
fn collect_storage_stats(&mut self, dest: &mut Self::StorageStats) {
self.src.collect_storage_stats(dest);
}
#[inline]
fn take_scanned_range(&mut self) -> IntervalRange {
self.src.take_scanned_range()
}
#[inline]
fn can_be_cached(&self) -> bool {
self.src.can_be_cached()
}
}
struct HeapItemSourceData {
physical_columns: LazyBatchColumnVec,
logical_rows: Vec<usize>,
}
struct HeapItemUnsafe {
order_is_desc_ptr: NonNull<[bool]>,
order_exprs_field_type_ptr: NonNull<[FieldType]>,
source_data: Arc<HeapItemSourceData>,
eval_columns_buffer_ptr: NonNull<Vec<RpnStackNode<'static>>>,
eval_columns_offset: usize,
logical_row_index: usize,
}
impl HeapItemUnsafe {
fn get_order_is_desc(&self) -> &[bool] {
unsafe { self.order_is_desc_ptr.as_ref() }
}
fn get_order_exprs_field_type(&self) -> &[FieldType] {
unsafe { self.order_exprs_field_type_ptr.as_ref() }
}
fn get_eval_columns(&self, len: usize) -> &[RpnStackNode<'_>] {
let offset_begin = self.eval_columns_offset;
let offset_end = offset_begin + len;
let vec_buf = unsafe { self.eval_columns_buffer_ptr.as_ref() };
&vec_buf[offset_begin..offset_end]
}
fn cmp_sort_key(&self, other: &Self) -> Result<Ordering> {
debug_assert_eq!(self.get_order_is_desc(), other.get_order_is_desc());
let order_is_desc = self.get_order_is_desc();
let order_exprs_field_type = self.get_order_exprs_field_type();
let columns_len = order_is_desc.len();
let eval_columns_lhs = self.get_eval_columns(columns_len);
let eval_columns_rhs = other.get_eval_columns(columns_len);
for column_idx in 0..columns_len {
let lhs_node = &eval_columns_lhs[column_idx];
let rhs_node = &eval_columns_rhs[column_idx];
let lhs = lhs_node.get_logical_scalar_ref(self.logical_row_index);
let rhs = rhs_node.get_logical_scalar_ref(other.logical_row_index);
let ord = lhs.cmp_sort_key(&rhs, &order_exprs_field_type[column_idx])?;
if ord == Ordering::Equal {
continue;
}
if !order_is_desc[column_idx] {
return Ok(ord);
} else {
return Ok(ord.reverse());
}
}
Ok(Ordering::Equal)
}
}
impl Ord for HeapItemUnsafe {
fn cmp(&self, other: &Self) -> Ordering {
self.cmp_sort_key(other).unwrap()
}
}
impl PartialOrd for HeapItemUnsafe {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for HeapItemUnsafe {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for HeapItemUnsafe {}
#[cfg(test)]
mod tests {
use super::*;
use tidb_query_datatype::builder::FieldTypeBuilder;
use tidb_query_datatype::{Collation, FieldTypeFlag, FieldTypeTp};
use crate::util::mock_executor::MockExecutor;
use tidb_query_datatype::expr::EvalWarnings;
use tidb_query_expr::RpnExpressionBuilder;
#[test]
fn test_top_0() {
let src_exec = MockExecutor::new(
vec![FieldTypeTp::Double.into()],
vec![BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![VectorValue::Real(
vec![None, Real::new(7.0).ok(), None, None].into(),
)]),
logical_rows: (0..1).collect(),
warnings: EvalWarnings::default(),
is_drained: Ok(true),
}],
);
let mut exec = BatchTopNExecutor::new_for_test(
src_exec,
vec![
RpnExpressionBuilder::new_for_test()
.push_constant_for_test(1)
.build_for_test(),
],
vec![false],
0,
);
let r = exec.next_batch(1);
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(r.is_drained.unwrap());
}
#[test]
fn test_no_row() {
let src_exec = MockExecutor::new(
vec![FieldTypeTp::LongLong.into()],
vec![
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![VectorValue::Int(
vec![Some(5)].into(),
)]),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::empty(),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(true),
},
],
);
let mut exec = BatchTopNExecutor::new_for_test(
src_exec,
vec![
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.build_for_test(),
],
vec![false],
10,
);
let r = exec.next_batch(1);
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(r.is_drained.unwrap());
}
fn make_src_executor() -> MockExecutor {
MockExecutor::new(
vec![
FieldTypeTp::LongLong.into(),
FieldTypeTp::LongLong.into(),
FieldTypeTp::Double.into(),
],
vec![
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![None, None, Some(5), None].into()),
VectorValue::Int(vec![None, Some(1), None, Some(-1)].into()),
VectorValue::Real(
vec![
Real::new(2.0).ok(),
Real::new(4.0).ok(),
None,
Real::new(-1.0).ok(),
]
.into(),
),
]),
logical_rows: vec![3, 0, 1],
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(vec![Some(0)].into()),
VectorValue::Int(vec![Some(10)].into()),
VectorValue::Real(vec![Real::new(10.0).ok()].into()),
]),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(
vec![Some(-10), Some(-1), Some(-10), None, Some(-10), None].into(),
),
VectorValue::Int(
vec![None, None, Some(10), Some(-9), Some(-10), None].into(),
),
VectorValue::Real(
vec![
Real::new(-5.0).ok(),
None,
Real::new(3.0).ok(),
None,
Real::new(0.0).ok(),
Real::new(9.9).ok(),
]
.into(),
),
]),
logical_rows: vec![1, 2, 0, 4],
warnings: EvalWarnings::default(),
is_drained: Ok(true),
},
],
)
}
#[test]
fn test_integration_1() {
let src_exec = make_src_executor();
let mut exec = BatchTopNExecutor::new_for_test(
src_exec,
vec![
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(2)
.build_for_test(),
],
vec![false],
100,
);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[0, 1, 2, 3, 4, 5, 6]);
assert_eq!(r.physical_columns.rows_len(), 7);
assert_eq!(r.physical_columns.columns_len(), 3);
assert_eq!(
r.physical_columns[0].decoded().to_int_vec(),
&[Some(-1), Some(-10), None, Some(-10), None, Some(-10), None]
);
assert_eq!(
r.physical_columns[1].decoded().to_int_vec(),
&[None, None, Some(-1), Some(-10), None, Some(10), Some(1)]
);
assert_eq!(
r.physical_columns[2].decoded().to_real_vec(),
&[
None,
Real::new(-5.0).ok(),
Real::new(-1.0).ok(),
Real::new(0.0).ok(),
Real::new(2.0).ok(),
Real::new(3.0).ok(),
Real::new(4.0).ok()
]
);
assert!(r.is_drained.unwrap());
}
#[test]
fn test_integration_2() {
let src_exec = make_src_executor();
let mut exec = BatchTopNExecutor::new_for_test(
src_exec,
vec![
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.build_for_test(),
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.build_for_test(),
],
vec![true, false],
7,
);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[0, 1, 2, 3, 4, 5, 6]);
assert_eq!(r.physical_columns.rows_len(), 7);
assert_eq!(r.physical_columns.columns_len(), 3);
assert_eq!(
r.physical_columns[0].decoded().to_int_vec(),
&[Some(-1), Some(-10), Some(-10), Some(-10), None, None, None]
);
assert_eq!(
r.physical_columns[1].decoded().to_int_vec(),
&[None, None, Some(-10), Some(10), None, Some(-1), Some(1)]
);
assert_eq!(
r.physical_columns[2].decoded().to_real_vec(),
&[
None,
Real::new(-5.0).ok(),
Real::new(0.0).ok(),
Real::new(3.0).ok(),
Real::new(2.0).ok(),
Real::new(-1.0).ok(),
Real::new(4.0).ok()
]
);
assert!(r.is_drained.unwrap());
}
#[test]
fn test_integration_3() {
use tidb_query_expr::impl_arithmetic::{arithmetic_fn_meta, IntIntPlus};
use tidb_query_expr::impl_op::is_null_fn_meta;
let src_exec = make_src_executor();
let mut exec = BatchTopNExecutor::new_for_test(
src_exec,
vec![
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.push_fn_call_for_test(is_null_fn_meta::<Int>(), 1, FieldTypeTp::LongLong)
.build_for_test(),
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.build_for_test(),
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.push_constant_for_test(1)
.push_fn_call_for_test(
arithmetic_fn_meta::<IntIntPlus>(),
2,
FieldTypeTp::LongLong,
)
.build_for_test(),
],
vec![false, false, true],
5,
);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[0, 1, 2, 3, 4]);
assert_eq!(r.physical_columns.rows_len(), 5);
assert_eq!(r.physical_columns.columns_len(), 3);
assert_eq!(
r.physical_columns[0].decoded().to_int_vec(),
&[Some(-10), Some(-10), Some(-10), Some(-1), None]
);
assert_eq!(
r.physical_columns[1].decoded().to_int_vec(),
&[Some(10), Some(-10), None, None, Some(1)]
);
assert_eq!(
r.physical_columns[2].decoded().to_real_vec(),
&[
Real::new(3.0).ok(),
Real::new(0.0).ok(),
Real::new(-5.0).ok(),
None,
Real::new(4.0).ok()
]
);
assert!(r.is_drained.unwrap());
}
fn make_bytes_src_executor() -> MockExecutor {
MockExecutor::new(
vec![
FieldTypeBuilder::new()
.tp(FieldTypeTp::VarChar)
.collation(Collation::Utf8Mb4GeneralCi)
.into(),
FieldTypeBuilder::new()
.tp(FieldTypeTp::VarChar)
.collation(Collation::Utf8Mb4Bin)
.into(),
FieldTypeBuilder::new()
.tp(FieldTypeTp::VarChar)
.collation(Collation::Binary)
.into(),
],
vec![
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Bytes(
vec![Some(b"aa".to_vec()), None, Some(b"aa".to_vec())].into(),
),
VectorValue::Bytes(
vec![Some(b"aa".to_vec()), None, Some(b"aaa".to_vec())].into(),
),
VectorValue::Bytes(
vec![None, Some(b"Aa".to_vec()), Some("áaA".as_bytes().to_vec())]
.into(),
),
]),
logical_rows: vec![2, 1, 0],
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::empty(),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Bytes(
vec![
Some("áaA".as_bytes().to_vec()),
Some("áa".as_bytes().to_vec()),
Some(b"Aa".to_vec()),
Some(b"aaa".to_vec()),
]
.into(),
),
VectorValue::Bytes(
vec![
Some("áa".as_bytes().to_vec()),
Some("áaA".as_bytes().to_vec()),
None,
Some(b"Aa".to_vec()),
]
.into(),
),
VectorValue::Bytes(
vec![
None,
Some(b"aa".to_vec()),
Some(b"aaa".to_vec()),
Some("áa".as_bytes().to_vec()),
]
.into(),
),
]),
logical_rows: vec![0, 1, 2, 3],
warnings: EvalWarnings::default(),
is_drained: Ok(true),
},
],
)
}
#[test]
fn test_bytes_1() {
let src_exec = make_bytes_src_executor();
let mut exec = BatchTopNExecutor::new_for_test(
src_exec,
vec![
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.build_for_test(),
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(2)
.build_for_test(),
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.build_for_test(),
],
vec![true, true, false],
5,
);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[0, 1, 2, 3, 4]);
assert_eq!(r.physical_columns.rows_len(), 5);
assert_eq!(r.physical_columns.columns_len(), 3);
assert_eq!(
r.physical_columns[0].decoded().to_bytes_vec(),
&[
Some(b"aaa".to_vec()),
Some("áaA".as_bytes().to_vec()),
Some(b"aa".to_vec()),
Some(b"Aa".to_vec()),
Some("áa".as_bytes().to_vec()),
]
);
assert_eq!(
r.physical_columns[1].decoded().to_bytes_vec(),
&[
Some(b"Aa".to_vec()),
Some("áa".as_bytes().to_vec()),
Some(b"aaa".to_vec()),
None,
Some("áaA".as_bytes().to_vec()),
]
);
assert_eq!(
r.physical_columns[2].decoded().to_bytes_vec(),
&[
Some("áa".as_bytes().to_vec()),
None,
Some("áaA".as_bytes().to_vec()),
Some(b"aaa".to_vec()),
Some(b"aa".to_vec()),
]
);
assert!(r.is_drained.unwrap());
}
#[test]
fn test_bytes_2() {
let src_exec = make_bytes_src_executor();
let mut exec = BatchTopNExecutor::new_for_test(
src_exec,
vec![
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(0)
.build_for_test(),
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(1)
.build_for_test(),
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(2)
.build_for_test(),
],
vec![false, false, false],
5,
);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[0, 1, 2, 3, 4]);
assert_eq!(r.physical_columns.rows_len(), 5);
assert_eq!(r.physical_columns.columns_len(), 3);
assert_eq!(
r.physical_columns[0].decoded().to_bytes_vec(),
&[
None,
Some(b"Aa".to_vec()),
Some(b"aa".to_vec()),
Some(b"aa".to_vec()),
Some("áa".as_bytes().to_vec()),
]
);
assert_eq!(
r.physical_columns[1].decoded().to_bytes_vec(),
&[
None,
None,
Some(b"aa".to_vec()),
Some(b"aaa".to_vec()),
Some("áaA".as_bytes().to_vec()),
]
);
assert_eq!(
r.physical_columns[2].decoded().to_bytes_vec(),
&[
Some(b"Aa".to_vec()),
Some(b"aaa".to_vec()),
None,
Some("áaA".as_bytes().to_vec()),
Some(b"aa".to_vec()),
]
);
assert!(r.is_drained.unwrap());
}
fn make_src_executor_unsigned() -> MockExecutor {
MockExecutor::new(
vec![
FieldTypeBuilder::new()
.tp(FieldTypeTp::LongLong)
.flag(FieldTypeFlag::UNSIGNED)
.into(),
FieldTypeTp::LongLong.into(),
FieldTypeBuilder::new()
.tp(FieldTypeTp::Long)
.flag(FieldTypeFlag::UNSIGNED)
.into(),
],
vec![
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(
vec![
Some(18_446_744_073_709_551_613_u64 as i64),
None,
Some(18_446_744_073_709_551_615_u64 as i64),
]
.into(),
),
VectorValue::Int(vec![Some(-1), None, Some(-3)].into()),
VectorValue::Int(
vec![
Some(4_294_967_295_u32 as i64),
None,
Some(4_294_967_295_u32 as i64),
]
.into(),
),
]),
logical_rows: vec![2, 1, 0],
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::empty(),
logical_rows: Vec::new(),
warnings: EvalWarnings::default(),
is_drained: Ok(false),
},
BatchExecuteResult {
physical_columns: LazyBatchColumnVec::from(vec![
VectorValue::Int(
vec![
Some(300_u64 as i64),
Some(9_223_372_036_854_775_807_u64 as i64),
Some(2000_u64 as i64),
Some(9_223_372_036_854_775_808_u64 as i64),
]
.into(),
),
VectorValue::Int(
vec![
Some(300),
Some(9_223_372_036_854_775_807),
Some(2000),
Some(-9_223_372_036_854_775_808),
]
.into(),
),
VectorValue::Int(
vec![
Some(300_u32 as i64),
Some(2_147_483_647_u32 as i64),
Some(2000_u32 as i64),
Some(2_147_483_648_u32 as i64),
]
.into(),
),
]),
logical_rows: vec![2, 1, 0, 3],
warnings: EvalWarnings::default(),
is_drained: Ok(true),
},
],
)
}
#[test]
fn test_top_unsigned() {
let test_top5 = |col_index: usize, is_desc: bool, expected: &[Option<i64>]| {
let src_exec = make_src_executor_unsigned();
let mut exec = BatchTopNExecutor::new_for_test(
src_exec,
vec![
RpnExpressionBuilder::new_for_test()
.push_column_ref_for_test(col_index)
.build_for_test(),
],
vec![is_desc],
5,
);
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert!(r.logical_rows.is_empty());
assert_eq!(r.physical_columns.rows_len(), 0);
assert!(!r.is_drained.unwrap());
let r = exec.next_batch(1);
assert_eq!(&r.logical_rows, &[0, 1, 2, 3, 4]);
assert_eq!(r.physical_columns.rows_len(), 5);
assert_eq!(r.physical_columns.columns_len(), 3);
assert_eq!(
r.physical_columns[col_index].decoded().to_int_vec(),
expected
);
assert!(r.is_drained.unwrap());
};
test_top5(
0,
false,
&[
None,
Some(300_u64 as i64),
Some(2000_u64 as i64),
Some(9_223_372_036_854_775_807_u64 as i64),
Some(9_223_372_036_854_775_808_u64 as i64),
],
);
test_top5(
0,
true,
&[
Some(18_446_744_073_709_551_615_u64 as i64),
Some(18_446_744_073_709_551_613_u64 as i64),
Some(9_223_372_036_854_775_808_u64 as i64),
Some(9_223_372_036_854_775_807_u64 as i64),
Some(2000_u64 as i64),
],
);
test_top5(
1,
false,
&[
None,
Some(-9_223_372_036_854_775_808),
Some(-3),
Some(-1),
Some(300),
],
);
test_top5(
1,
true,
&[
Some(9_223_372_036_854_775_807),
Some(2000),
Some(300),
Some(-1),
Some(-3),
],
);
test_top5(
2,
false,
&[
None,
Some(300_u32 as i64),
Some(2000_u32 as i64),
Some(2_147_483_647_u32 as i64),
Some(2_147_483_648_u32 as i64),
],
);
test_top5(
2,
true,
&[
Some(4_294_967_295_u32 as i64),
Some(4_294_967_295_u32 as i64),
Some(2_147_483_648_u32 as i64),
Some(2_147_483_647_u32 as i64),
Some(2000_u32 as i64),
],
);
}
}