#![allow(incomplete_features)]
#![feature(proc_macro_hygiene)]
#![feature(specialization)]
#[macro_use(box_try)]
extern crate tikv_util;
#[macro_use(other_err)]
extern crate tidb_query_common;
mod impl_avg;
mod impl_bit_op;
mod impl_count;
mod impl_first;
mod impl_max_min;
mod impl_sum;
mod impl_variance;
mod parser;
mod summable;
mod util;
pub use self::parser::{AggrDefinitionParser, AllAggrDefinitionParser};
use tidb_query_common::Result;
use tidb_query_datatype::codec::data_type::*;
use tidb_query_datatype::expr::EvalContext;
pub trait AggrFunction: std::fmt::Debug + Send + 'static {
fn name(&self) -> &'static str;
fn create_state(&self) -> Box<dyn AggrFunctionState>;
}
pub trait AggrFunctionState:
std::fmt::Debug
+ Send
+ 'static
+ AggrFunctionStateUpdatePartial<&'static Int>
+ AggrFunctionStateUpdatePartial<&'static Real>
+ AggrFunctionStateUpdatePartial<&'static Decimal>
+ AggrFunctionStateUpdatePartial<BytesRef<'static>>
+ AggrFunctionStateUpdatePartial<&'static DateTime>
+ AggrFunctionStateUpdatePartial<&'static Duration>
+ AggrFunctionStateUpdatePartial<JsonRef<'static>>
+ AggrFunctionStateUpdatePartial<EnumRef<'static>>
+ AggrFunctionStateUpdatePartial<SetRef<'static>>
{
fn push_result(&self, ctx: &mut EvalContext, target: &mut [VectorValue]) -> Result<()>;
}
pub trait ConcreteAggrFunctionState: std::fmt::Debug + Send + 'static {
type ParameterType: EvaluableRef<'static>;
unsafe fn update_concrete_unsafe(
&mut self,
ctx: &mut EvalContext,
value: Option<Self::ParameterType>,
) -> Result<()>;
fn push_result(&self, ctx: &mut EvalContext, target: &mut [VectorValue]) -> Result<()>;
}
#[macro_export]
macro_rules! update_concrete {
( $state:expr, $ctx:expr, $value:expr ) => {
unsafe { $state.update_concrete_unsafe($ctx, $value.unsafe_into()) }
};
}
#[macro_export]
macro_rules! update_vector {
( $state:expr, $ctx:expr, $physical_values:expr, $logical_rows:expr ) => {
unsafe {
$state.update_vector_unsafe(
$ctx,
$physical_values.phantom_data().unsafe_into(),
$physical_values.unsafe_into(),
$logical_rows,
)
}
};
}
#[macro_export]
macro_rules! update_repeat {
( $state:expr, $ctx:expr, $value:expr, $repeat_times:expr ) => {
unsafe { $state.update_repeat_unsafe($ctx, $value.unsafe_into(), $repeat_times) }
};
}
#[macro_export]
macro_rules! update {
( $state:expr, $ctx:expr, $value:expr ) => {
unsafe { $state.update_unsafe($ctx, $value.unsafe_into()) }
};
}
#[macro_export]
macro_rules! impl_state_update_partial {
( $ty:tt ) => {
#[inline]
unsafe fn update_unsafe(
&mut self,
ctx: &mut EvalContext,
value: Option<$ty>,
) -> Result<()> {
self.update(ctx, value)
}
#[inline]
unsafe fn update_repeat_unsafe(
&mut self,
ctx: &mut EvalContext,
value: Option<$ty>,
repeat_times: usize,
) -> Result<()> {
self.update_repeat(ctx, value, repeat_times)
}
#[inline]
unsafe fn update_vector_unsafe(
&mut self,
ctx: &mut EvalContext,
phantom_data: Option<$ty>,
physical_values: $ty::ChunkedType,
logical_rows: &[usize],
) -> Result<()> {
self.update_vector(ctx, phantom_data, physical_values, logical_rows)
}
};
}
#[macro_export]
macro_rules! impl_concrete_state {
( $ty:ty ) => {
#[inline]
unsafe fn update_concrete_unsafe(
&mut self,
ctx: &mut EvalContext,
value: Option<$ty>,
) -> Result<()> {
self.update_concrete(ctx, value)
}
};
}
#[macro_export]
macro_rules! impl_unmatched_function_state {
( $ty:ty ) => {
impl<T1, T> super::AggrFunctionStateUpdatePartial<T1> for $ty
where
T1: EvaluableRef<'static> + 'static,
T: EvaluableRef<'static> + 'static,
VectorValue: VectorValueExt<T::EvaluableType>,
{
#[inline]
default unsafe fn update_unsafe(
&mut self,
_ctx: &mut EvalContext,
_value: Option<T1>,
) -> Result<()> {
panic!("Unmatched parameter type")
}
#[inline]
default unsafe fn update_repeat_unsafe(
&mut self,
_ctx: &mut EvalContext,
_value: Option<T1>,
_repeat_times: usize,
) -> Result<()> {
panic!("Unmatched parameter type")
}
#[inline]
default unsafe fn update_vector_unsafe(
&mut self,
_ctx: &mut EvalContext,
_phantom_data: Option<T1>,
_physical_values: T1::ChunkedType,
_logical_rows: &[usize],
) -> Result<()> {
panic!("Unmatched parameter type")
}
}
};
}
pub trait AggrFunctionStateUpdatePartial<TT: EvaluableRef<'static>> {
unsafe fn update_unsafe(&mut self, ctx: &mut EvalContext, value: Option<TT>) -> Result<()>;
unsafe fn update_repeat_unsafe(
&mut self,
ctx: &mut EvalContext,
value: Option<TT>,
repeat_times: usize,
) -> Result<()>;
unsafe fn update_vector_unsafe(
&mut self,
ctx: &mut EvalContext,
phantom_data: Option<TT>,
physical_values: TT::ChunkedType,
logical_rows: &[usize],
) -> Result<()>;
}
impl<T: EvaluableRef<'static>, State> AggrFunctionStateUpdatePartial<T> for State
where
State: ConcreteAggrFunctionState,
{
#[inline]
default unsafe fn update_unsafe(
&mut self,
_ctx: &mut EvalContext,
_value: Option<T>,
) -> Result<()> {
panic!("Unmatched parameter type")
}
#[inline]
default unsafe fn update_repeat_unsafe(
&mut self,
_ctx: &mut EvalContext,
_value: Option<T>,
_repeat_times: usize,
) -> Result<()> {
panic!("Unmatched parameter type")
}
#[inline]
default unsafe fn update_vector_unsafe(
&mut self,
_ctx: &mut EvalContext,
_phantom_data: Option<T>,
_physical_values: T::ChunkedType,
_logical_rows: &[usize],
) -> Result<()> {
panic!("Unmatched parameter type")
}
}
impl<T: EvaluableRef<'static>, State> AggrFunctionStateUpdatePartial<T> for State
where
State: ConcreteAggrFunctionState<ParameterType = T>,
{
#[inline]
unsafe fn update_unsafe(&mut self, ctx: &mut EvalContext, value: Option<T>) -> Result<()> {
self.update_concrete_unsafe(ctx, value)
}
#[inline]
unsafe fn update_repeat_unsafe(
&mut self,
ctx: &mut EvalContext,
value: Option<T>,
repeat_times: usize,
) -> Result<()> {
for _ in 0..repeat_times {
self.update_concrete_unsafe(ctx, value.clone())?;
}
Ok(())
}
#[inline]
unsafe fn update_vector_unsafe(
&mut self,
ctx: &mut EvalContext,
_phantom_data: Option<T>,
physical_values: T::ChunkedType,
logical_rows: &[usize],
) -> Result<()> {
for physical_index in logical_rows {
self.update_concrete_unsafe(ctx, physical_values.get_option_ref(*physical_index))?;
}
Ok(())
}
}
impl<F> AggrFunctionState for F
where
F: ConcreteAggrFunctionState,
{
fn push_result(&self, ctx: &mut EvalContext, target: &mut [VectorValue]) -> Result<()> {
<Self as ConcreteAggrFunctionState>::push_result(self, ctx, target)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tidb_query_datatype::EvalType;
#[test]
fn test_type_match() {
#[derive(Clone, Debug)]
struct AggrFnStateFoo {
sum: i64,
}
impl AggrFnStateFoo {
fn new() -> Self {
Self { sum: 0 }
}
}
impl ConcreteAggrFunctionState for AggrFnStateFoo {
type ParameterType = &'static Int;
unsafe fn update_concrete_unsafe(
&mut self,
_ctx: &mut EvalContext,
value: Option<&'static Int>,
) -> Result<()> {
if let Some(v) = value {
self.sum += *v;
}
Ok(())
}
fn push_result(
&self,
_ctx: &mut EvalContext,
target: &mut [VectorValue],
) -> Result<()> {
target[0].push_real(Real::new(self.sum as f64).ok());
Ok(())
}
}
let mut ctx = EvalContext::default();
let mut s = AggrFnStateFoo::new();
assert!(
update!(
&mut s as &mut dyn AggrFunctionStateUpdatePartial<_>,
&mut ctx,
Some(&1)
)
.is_ok()
);
assert!(
update!(
&mut s as &mut dyn AggrFunctionStateUpdatePartial<_>,
&mut ctx,
Some(&3)
)
.is_ok()
);
let result = panic_hook::recover_safe(|| {
let mut s = s.clone();
let _ = update!(
&mut s as &mut dyn AggrFunctionStateUpdatePartial<_>,
&mut ctx,
Real::new(1.0).ok().as_ref()
);
});
assert!(result.is_err());
let result = panic_hook::recover_safe(|| {
let mut s = s.clone();
let _ = update!(
&mut s as &mut dyn AggrFunctionStateUpdatePartial<_>,
&mut ctx,
Some(&[1u8] as BytesRef)
);
});
assert!(result.is_err());
let mut target = vec![VectorValue::with_capacity(0, EvalType::Real)];
assert!(
(&mut s as &mut dyn AggrFunctionState)
.push_result(&mut ctx, &mut target)
.is_ok()
);
assert_eq!(target[0].to_real_vec(), &[Real::new(4.0).ok()]);
assert!(
update!(
&mut s as &mut dyn AggrFunctionStateUpdatePartial<_>,
&mut ctx,
Some(&1)
)
.is_ok()
);
assert!(
(&mut s as &mut dyn AggrFunctionState)
.push_result(&mut ctx, &mut target)
.is_ok()
);
assert_eq!(
target[0].to_real_vec(),
&[Real::new(4.0).ok(), Real::new(5.0).ok()]
);
let result = panic_hook::recover_safe(|| {
let mut s = s.clone();
let mut target: Vec<VectorValue> = Vec::new();
let _ = (&mut s as &mut dyn AggrFunctionState).push_result(&mut ctx, &mut target[..]);
});
assert!(result.is_err());
let result = panic_hook::recover_safe(|| {
let mut s = s.clone();
let mut target: Vec<VectorValue> = vec![VectorValue::with_capacity(0, EvalType::Int)];
let _ = (&mut s as &mut dyn AggrFunctionState).push_result(&mut ctx, &mut target[..]);
});
assert!(result.is_err());
}
}