use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use kvproto::coprocessor::{KeyRange, Response};
use protobuf::Message;
use rand::rngs::StdRng;
use rand::Rng;
use tidb_query_common::storage::scanner::{RangesScanner, RangesScannerOptions};
use tidb_query_common::storage::Range;
use tidb_query_datatype::codec::datum::{encode_value, split_datum, Datum, NIL_FLAG};
use tidb_query_datatype::codec::table;
use tidb_query_datatype::def::Collation;
use tidb_query_datatype::expr::{EvalConfig, EvalContext};
use tidb_query_datatype::FieldTypeAccessor;
use tidb_query_executors::{
interface::BatchExecutor, runner::MAX_TIME_SLICE, BatchTableScanExecutor,
};
use tidb_query_expr::BATCH_MAX_SIZE;
use tipb::{self, AnalyzeColumnsReq, AnalyzeIndexReq, AnalyzeReq, AnalyzeType};
use yatp::task::future::reschedule;
use super::cmsketch::CmSketch;
use super::fmsketch::FmSketch;
use super::histogram::Histogram;
use crate::coprocessor::dag::TiKVStorage;
use crate::coprocessor::*;
use crate::storage::{Snapshot, SnapshotStore, Statistics};
const ANALYZE_VERSION_V1: i32 = 1;
const ANALYZE_VERSION_V2: i32 = 2;
pub struct AnalyzeContext<S: Snapshot> {
req: AnalyzeReq,
storage: Option<TiKVStorage<SnapshotStore<S>>>,
ranges: Vec<KeyRange>,
storage_stats: Statistics,
}
impl<S: Snapshot> AnalyzeContext<S> {
pub fn new(
req: AnalyzeReq,
ranges: Vec<KeyRange>,
start_ts: u64,
snap: S,
req_ctx: &ReqContext,
) -> Result<Self> {
let store = SnapshotStore::new(
snap,
start_ts.into(),
req_ctx.context.get_isolation_level(),
!req_ctx.context.get_not_fill_cache(),
req_ctx.bypass_locks.clone(),
false,
);
Ok(Self {
req,
storage: Some(TiKVStorage::new(store, false)),
ranges,
storage_stats: Statistics::default(),
})
}
async fn handle_column(builder: &mut SampleBuilder<S>) -> Result<Vec<u8>> {
let (col_res, _) = builder.collect_columns_stats().await?;
let res_data = {
let res = col_res.into_proto();
box_try!(res.write_to_bytes())
};
Ok(res_data)
}
async fn handle_mixed(builder: &mut SampleBuilder<S>) -> Result<Vec<u8>> {
let (col_res, idx_res) = builder.collect_columns_stats().await?;
let res_data = {
let resp = AnalyzeMixedResult::new(
col_res,
idx_res.ok_or_else(|| {
Error::Other("Mixed analyze type should have index response.".into())
})?,
)
.into_proto();
box_try!(resp.write_to_bytes())
};
Ok(res_data)
}
async fn handle_full_sampling(builder: &mut RowSampleBuilder<S>) -> Result<Vec<u8>> {
let sample_res = builder.collect_column_stats().await?;
let res_data = {
let res = sample_res.into_proto();
box_try!(res.write_to_bytes())
};
Ok(res_data)
}
async fn handle_index(
req: AnalyzeIndexReq,
scanner: &mut RangesScanner<TiKVStorage<SnapshotStore<S>>>,
is_common_handle: bool,
) -> Result<Vec<u8>> {
let mut hist = Histogram::new(req.get_bucket_size() as usize);
let mut cms = CmSketch::new(
req.get_cmsketch_depth() as usize,
req.get_cmsketch_width() as usize,
);
let mut fms = FmSketch::new(req.get_sketch_size() as usize);
let mut row_count = 0;
let mut time_slice_start = Instant::now();
let mut topn_heap = BinaryHeap::new();
let mut cur_val: (u32, Vec<u8>) = (0, vec![]);
let top_n_size = req.get_top_n_size() as usize;
let stats_version = if req.has_version() {
req.get_version()
} else {
ANALYZE_VERSION_V1
};
while let Some((key, _)) = scanner.next()? {
row_count += 1;
if row_count >= BATCH_MAX_SIZE {
if time_slice_start.elapsed() > MAX_TIME_SLICE {
reschedule().await;
time_slice_start = Instant::now();
}
row_count = 0;
}
let mut key = &key[..];
if is_common_handle {
table::check_record_key(key)?;
key = &key[table::PREFIX_LEN..];
} else {
table::check_index_key(key)?;
key = &key[table::PREFIX_LEN + table::ID_LEN..];
}
let mut datums = key;
let mut data = Vec::with_capacity(key.len());
for i in 0..req.get_num_columns() as usize {
if datums.is_empty() {
return Err(box_err!(
"{}th column is missing in datum buffer: {}",
i,
log_wrappers::Value::key(key)
));
}
let (column, remaining) = split_datum(datums, false)?;
datums = remaining;
data.extend_from_slice(column);
if let Some(cms) = cms.as_mut() {
cms.insert(&data);
}
}
fms.insert(&data);
if stats_version == ANALYZE_VERSION_V2 {
hist.append(&data, true);
if cur_val.1 == data {
cur_val.0 += 1;
} else {
if cur_val.0 > 0 {
topn_heap.push(Reverse(cur_val));
}
if topn_heap.len() > top_n_size {
topn_heap.pop();
}
cur_val = (1, data);
}
} else {
hist.append(&data, false);
}
}
if stats_version == ANALYZE_VERSION_V2 {
if cur_val.0 > 0 {
topn_heap.push(Reverse(cur_val));
if topn_heap.len() > top_n_size {
topn_heap.pop();
}
}
if let Some(c) = cms.as_mut() {
for heap_item in topn_heap {
c.sub(&(heap_item.0).1, (heap_item.0).0);
c.push_to_top_n((heap_item.0).1, (heap_item.0).0 as u64);
}
}
}
let res = AnalyzeIndexResult::new(hist, cms, Some(fms)).into_proto();
let dt = box_try!(res.write_to_bytes());
Ok(dt)
}
}
#[async_trait]
impl<S: Snapshot> RequestHandler for AnalyzeContext<S> {
async fn handle_request(&mut self) -> Result<Response> {
let ret = match self.req.get_tp() {
AnalyzeType::TypeIndex | AnalyzeType::TypeCommonHandle => {
let req = self.req.take_idx_req();
let ranges = std::mem::take(&mut self.ranges);
table::check_table_ranges(&ranges)?;
let mut scanner = RangesScanner::new(RangesScannerOptions {
storage: self.storage.take().unwrap(),
ranges: ranges
.into_iter()
.map(|r| Range::from_pb_range(r, false))
.collect(),
scan_backward_in_range: false,
is_key_only: true,
is_scanned_range_aware: false,
});
let res = AnalyzeContext::handle_index(
req,
&mut scanner,
self.req.get_tp() == AnalyzeType::TypeCommonHandle,
)
.await;
scanner.collect_storage_stats(&mut self.storage_stats);
res
}
AnalyzeType::TypeColumn => {
let col_req = self.req.take_col_req();
let storage = self.storage.take().unwrap();
let ranges = std::mem::take(&mut self.ranges);
let mut builder = SampleBuilder::new(col_req, None, storage, ranges)?;
let res = AnalyzeContext::handle_column(&mut builder).await;
builder.data.collect_storage_stats(&mut self.storage_stats);
res
}
AnalyzeType::TypeMixed => {
let col_req = self.req.take_col_req();
let idx_req = self.req.take_idx_req();
let storage = self.storage.take().unwrap();
let ranges = std::mem::take(&mut self.ranges);
let mut builder = SampleBuilder::new(col_req, Some(idx_req), storage, ranges)?;
let res = AnalyzeContext::handle_mixed(&mut builder).await;
builder.data.collect_storage_stats(&mut self.storage_stats);
res
}
AnalyzeType::TypeFullSampling => {
let col_req = self.req.take_col_req();
let storage = self.storage.take().unwrap();
let ranges = std::mem::take(&mut self.ranges);
let mut builder = RowSampleBuilder::new(col_req, storage, ranges)?;
let res = AnalyzeContext::handle_full_sampling(&mut builder).await;
builder.data.collect_storage_stats(&mut self.storage_stats);
res
}
AnalyzeType::TypeSampleIndex => Err(Error::Other(
"Analyze of this kind not implemented".to_string(),
)),
};
match ret {
Ok(data) => {
let mut resp = Response::default();
resp.set_data(data);
Ok(resp)
}
Err(Error::Other(e)) => {
let mut resp = Response::default();
resp.set_other_error(e);
Ok(resp)
}
Err(e) => Err(e),
}
}
fn collect_scan_statistics(&mut self, dest: &mut Statistics) {
dest.add(&self.storage_stats);
self.storage_stats = Statistics::default();
}
}
struct RowSampleBuilder<S: Snapshot> {
data: BatchTableScanExecutor<TiKVStorage<SnapshotStore<S>>>,
max_sample_size: usize,
max_fm_sketch_size: usize,
columns_info: Vec<tipb::ColumnInfo>,
column_groups: Vec<tipb::AnalyzeColumnGroup>,
}
impl<S: Snapshot> RowSampleBuilder<S> {
fn new(
mut req: AnalyzeColumnsReq,
storage: TiKVStorage<SnapshotStore<S>>,
ranges: Vec<KeyRange>,
) -> Result<Self> {
let columns_info: Vec<_> = req.take_columns_info().into();
if columns_info.is_empty() {
return Err(box_err!("empty columns_info"));
}
let common_handle_ids = req.take_primary_column_ids();
let table_scanner = BatchTableScanExecutor::new(
storage,
Arc::new(EvalConfig::default()),
columns_info.clone(),
ranges,
common_handle_ids,
false,
false,
req.take_primary_prefix_column_ids(),
)?;
Ok(Self {
data: table_scanner,
max_sample_size: req.get_sample_size() as usize,
max_fm_sketch_size: req.get_sketch_size() as usize,
columns_info,
column_groups: req.take_column_groups().into(),
})
}
async fn collect_column_stats(&mut self) -> Result<AnalyzeSamplingResult> {
use tidb_query_datatype::{codec::collation::Collator, match_template_collator};
let mut is_drained = false;
let mut time_slice_start = Instant::now();
let mut collector = RowSampleCollector::new(
self.max_sample_size,
self.max_fm_sketch_size,
self.columns_info.len() + self.column_groups.len(),
);
while !is_drained {
let time_slice_elapsed = time_slice_start.elapsed();
if time_slice_elapsed > MAX_TIME_SLICE {
reschedule().await;
time_slice_start = Instant::now();
}
let result = self.data.next_batch(BATCH_MAX_SIZE);
is_drained = result.is_drained?;
let columns_slice = result.physical_columns.as_slice();
for logical_row in &result.logical_rows {
let mut column_vals: Vec<Vec<u8>> = Vec::new();
for i in 0..self.columns_info.len() {
let mut val = vec![];
columns_slice[i].encode(
*logical_row,
&self.columns_info[i],
&mut EvalContext::default(),
&mut val,
)?;
if self.columns_info[i].as_accessor().is_string_like() {
let sorted_val = match_template_collator! {
TT, match self.columns_info[i].as_accessor().collation()? {
Collation::TT => {
let mut mut_val = &val[..];
let decoded_val = table::decode_col_value(&mut mut_val, &mut EvalContext::default(), &self.columns_info[i])?;
if decoded_val == Datum::Null {
val
} else {
let decoded_sorted_val = TT::sort_key(&decoded_val.as_string()?.unwrap().into_owned())?;
encode_value(&mut EvalContext::default(), &[Datum::Bytes(decoded_sorted_val)])?
}
}
}
};
column_vals.push(sorted_val);
continue;
}
column_vals.push(val);
}
collector.count += 1;
collector.collect_column_group(&column_vals, &self.column_groups);
collector.collect_column(column_vals);
}
}
Ok(AnalyzeSamplingResult::new(collector))
}
}
#[derive(Clone)]
struct RowSampleCollector {
samples: BinaryHeap<Reverse<(i64, Vec<Vec<u8>>)>>,
null_count: Vec<i64>,
count: u64,
max_sample_size: usize,
fm_sketches: Vec<FmSketch>,
rng: StdRng,
total_sizes: Vec<i64>,
row_buf: Vec<u8>,
}
impl Default for RowSampleCollector {
fn default() -> Self {
RowSampleCollector {
samples: BinaryHeap::new(),
null_count: vec![],
count: 0,
max_sample_size: 0,
fm_sketches: vec![],
rng: StdRng::from_entropy(),
total_sizes: vec![],
row_buf: Vec::new(),
}
}
}
impl RowSampleCollector {
fn new(
max_sample_size: usize,
max_fm_sketch_size: usize,
col_and_group_len: usize,
) -> RowSampleCollector {
RowSampleCollector {
samples: BinaryHeap::new(),
null_count: vec![0; col_and_group_len],
count: 0,
max_sample_size,
fm_sketches: vec![FmSketch::new(max_fm_sketch_size); col_and_group_len],
rng: StdRng::from_entropy(),
total_sizes: vec![0; col_and_group_len],
row_buf: Vec::new(),
}
}
pub fn collect_column_group(
&mut self,
columns_val: &[Vec<u8>],
column_groups: &[tipb::AnalyzeColumnGroup],
) {
let col_len = columns_val.len();
for i in 0..column_groups.len() {
self.row_buf.clear();
let offsets = column_groups[i].get_column_offsets();
let mut has_null = true;
for j in 0..offsets.len() {
if columns_val[j][0] != NIL_FLAG {
has_null = false
}
self.total_sizes[col_len + i] += columns_val[i].len() as i64
}
if has_null && offsets.len() == 1 {
self.null_count[col_len + i] += 1;
continue;
}
for j in 0..offsets.len() {
self.row_buf.extend_from_slice(&columns_val[j]);
}
self.fm_sketches[col_len + i].insert(&self.row_buf);
}
}
pub fn collect_column(&mut self, columns_val: Vec<Vec<u8>>) {
for i in 0..columns_val.len() {
if columns_val[i][0] == NIL_FLAG {
self.null_count[i] += 1;
continue;
}
self.fm_sketches[i].insert(&columns_val[i]);
self.total_sizes[i] += columns_val[i].len() as i64;
}
self.sampling(columns_val);
}
pub fn sampling(&mut self, data: Vec<Vec<u8>>) {
let cur_rng = self.rng.gen_range(0, i64::MAX);
if self.samples.len() < self.max_sample_size {
self.samples.push(Reverse((cur_rng, data)));
return;
}
if self.samples.len() == self.max_sample_size && self.samples.peek().unwrap().0.0 < cur_rng
{
self.samples.pop();
self.samples.push(Reverse((cur_rng, data)));
}
}
pub fn into_proto(self) -> tipb::RowSampleCollector {
let mut s = tipb::RowSampleCollector::default();
let samples = self
.samples
.into_iter()
.map(|r_tuple| {
let mut pb_sample = tipb::RowSample::default();
pb_sample.set_row(r_tuple.0.1.into());
pb_sample.set_weight(r_tuple.0.0);
pb_sample
})
.collect();
s.set_samples(samples);
s.set_null_counts(self.null_count);
s.set_count(self.count as i64);
let pb_fm_sketches = self
.fm_sketches
.into_iter()
.map(|fm_sketch| fm_sketch.into_proto())
.collect();
s.set_fm_sketch(pb_fm_sketches);
s.set_total_size(self.total_sizes);
s
}
}
struct SampleBuilder<S: Snapshot> {
data: BatchTableScanExecutor<TiKVStorage<SnapshotStore<S>>>,
max_bucket_size: usize,
max_sample_size: usize,
max_fm_sketch_size: usize,
cm_sketch_depth: usize,
cm_sketch_width: usize,
stats_version: i32,
top_n_size: usize,
columns_info: Vec<tipb::ColumnInfo>,
analyze_common_handle: bool,
common_handle_col_ids: Vec<i64>,
}
impl<S: Snapshot> SampleBuilder<S> {
fn new(
mut req: AnalyzeColumnsReq,
common_handle_req: Option<tipb::AnalyzeIndexReq>,
storage: TiKVStorage<SnapshotStore<S>>,
ranges: Vec<KeyRange>,
) -> Result<Self> {
let columns_info: Vec<_> = req.take_columns_info().into();
if columns_info.is_empty() {
return Err(box_err!("empty columns_info"));
}
let common_handle_ids = req.take_primary_column_ids();
let table_scanner = BatchTableScanExecutor::new(
storage,
Arc::new(EvalConfig::default()),
columns_info.clone(),
ranges,
common_handle_ids.clone(),
false,
false,
req.take_primary_prefix_column_ids(),
)?;
Ok(Self {
data: table_scanner,
max_bucket_size: req.get_bucket_size() as usize,
max_fm_sketch_size: req.get_sketch_size() as usize,
max_sample_size: req.get_sample_size() as usize,
cm_sketch_depth: req.get_cmsketch_depth() as usize,
cm_sketch_width: req.get_cmsketch_width() as usize,
stats_version: common_handle_req.as_ref().map_or_else(
|| ANALYZE_VERSION_V1,
|req| match req.has_version() {
true => req.get_version(),
_ => ANALYZE_VERSION_V1,
},
),
top_n_size: common_handle_req
.as_ref()
.map_or_else(|| 0_usize, |req| req.get_top_n_size() as usize),
common_handle_col_ids: common_handle_ids,
columns_info,
analyze_common_handle: common_handle_req != None,
})
}
async fn collect_columns_stats(
&mut self,
) -> Result<(AnalyzeColumnsResult, Option<AnalyzeIndexResult>)> {
use tidb_query_datatype::codec::collation::Collator;
use tidb_query_datatype::match_template_collator;
let columns_without_handle_len =
self.columns_info.len() - self.columns_info[0].get_pk_handle() as usize;
let mut pk_builder = Histogram::new(self.max_bucket_size);
let mut collectors = vec![
SampleCollector::new(
self.max_sample_size,
self.max_fm_sketch_size,
self.cm_sketch_depth,
self.cm_sketch_width,
);
columns_without_handle_len
];
let mut is_drained = false;
let mut time_slice_start = Instant::now();
let mut common_handle_hist = Histogram::new(self.max_bucket_size);
let mut common_handle_cms = CmSketch::new(self.cm_sketch_depth, self.cm_sketch_width);
let mut common_handle_fms = FmSketch::new(self.max_fm_sketch_size);
while !is_drained {
let time_slice_elapsed = time_slice_start.elapsed();
if time_slice_elapsed > MAX_TIME_SLICE {
reschedule().await;
time_slice_start = Instant::now();
}
let result = self.data.next_batch(BATCH_MAX_SIZE);
is_drained = result.is_drained?;
let mut columns_slice = result.physical_columns.as_slice();
let mut columns_info = &self.columns_info[..];
if columns_without_handle_len + 1 == columns_slice.len() {
for logical_row in &result.logical_rows {
let mut data = vec![];
columns_slice[0].encode(
*logical_row,
&columns_info[0],
&mut EvalContext::default(),
&mut data,
)?;
pk_builder.append(&data, false);
}
columns_slice = &columns_slice[1..];
columns_info = &columns_info[1..];
}
if self.analyze_common_handle {
let mut cur_val: (u32, Vec<u8>) = (0, vec![]);
let mut topn_heap = BinaryHeap::new();
for logical_row in &result.logical_rows {
let mut data = vec![];
for i in 0..self.common_handle_col_ids.len() {
let mut handle_col_val = vec![];
columns_slice[i].encode(
*logical_row,
&columns_info[i],
&mut EvalContext::default(),
&mut handle_col_val,
)?;
data.extend_from_slice(&handle_col_val);
if let Some(common_handle_cms) = common_handle_cms.as_mut() {
common_handle_cms.insert(&data);
}
}
common_handle_fms.insert(&data);
if self.stats_version == ANALYZE_VERSION_V2 {
common_handle_hist.append(&data, true);
if cur_val.1 == data {
cur_val.0 += 1;
} else {
if cur_val.0 > 0 {
topn_heap.push(Reverse(cur_val));
}
if topn_heap.len() > self.top_n_size {
topn_heap.pop();
}
cur_val = (1, data);
}
} else {
common_handle_hist.append(&data, false)
}
}
if self.stats_version == ANALYZE_VERSION_V2 {
if cur_val.0 > 0 {
topn_heap.push(Reverse(cur_val));
if topn_heap.len() > self.top_n_size {
topn_heap.pop();
}
}
if let Some(c) = common_handle_cms.as_mut() {
for heap_item in topn_heap {
c.sub(&(heap_item.0).1, (heap_item.0).0);
c.push_to_top_n((heap_item.0).1, (heap_item.0).0 as u64);
}
}
}
}
for (i, collector) in collectors.iter_mut().enumerate() {
for logical_row in &result.logical_rows {
let mut val = vec![];
columns_slice[i].encode(
*logical_row,
&columns_info[i],
&mut EvalContext::default(),
&mut val,
)?;
if columns_info[i].as_accessor().is_string_like() {
let sorted_val = match_template_collator! {
TT, match columns_info[i].as_accessor().collation()? {
Collation::TT => {
let mut mut_val = &val[..];
let decoded_val = table::decode_col_value(&mut mut_val, &mut EvalContext::default(), &columns_info[i])?;
if decoded_val == Datum::Null {
val
} else {
let decoded_sorted_val = TT::sort_key(&decoded_val.as_string()?.unwrap().into_owned())?;
encode_value(&mut EvalContext::default(), &[Datum::Bytes(decoded_sorted_val)])?
}
}
}
};
collector.collect(sorted_val);
continue;
}
collector.collect(val);
}
}
}
let idx_res = if self.analyze_common_handle {
Some(AnalyzeIndexResult::new(
common_handle_hist,
common_handle_cms,
Some(common_handle_fms),
))
} else {
None
};
Ok((AnalyzeColumnsResult::new(collectors, pk_builder), idx_res))
}
}
#[derive(Clone)]
struct SampleCollector {
samples: Vec<Vec<u8>>,
null_count: u64,
count: u64,
max_sample_size: usize,
fm_sketch: FmSketch,
cm_sketch: Option<CmSketch>,
rng: StdRng,
total_size: u64,
}
impl SampleCollector {
fn new(
max_sample_size: usize,
max_fm_sketch_size: usize,
cm_sketch_depth: usize,
cm_sketch_width: usize,
) -> SampleCollector {
SampleCollector {
samples: Default::default(),
null_count: 0,
count: 0,
max_sample_size,
fm_sketch: FmSketch::new(max_fm_sketch_size),
cm_sketch: CmSketch::new(cm_sketch_depth, cm_sketch_width),
rng: StdRng::from_entropy(),
total_size: 0,
}
}
fn into_proto(self) -> tipb::SampleCollector {
let mut s = tipb::SampleCollector::default();
s.set_null_count(self.null_count as i64);
s.set_count(self.count as i64);
s.set_fm_sketch(self.fm_sketch.into_proto());
s.set_samples(self.samples.into());
if let Some(c) = self.cm_sketch {
s.set_cm_sketch(c.into_proto())
}
s.set_total_size(self.total_size as i64);
s
}
pub fn collect(&mut self, data: Vec<u8>) {
if data[0] == NIL_FLAG {
self.null_count += 1;
return;
}
self.count += 1;
self.fm_sketch.insert(&data);
if let Some(c) = self.cm_sketch.as_mut() {
c.insert(&data);
}
self.total_size += data.len() as u64;
if self.samples.len() < self.max_sample_size {
self.samples.push(data);
return;
}
if self.rng.gen_range(0, self.count) < self.max_sample_size as u64 {
let idx = self.rng.gen_range(0, self.max_sample_size);
self.samples.remove(idx);
self.samples.push(data);
}
}
}
#[derive(Default)]
struct AnalyzeSamplingResult {
row_sample_collector: RowSampleCollector,
}
impl AnalyzeSamplingResult {
fn new(row_sample_collector: RowSampleCollector) -> AnalyzeSamplingResult {
AnalyzeSamplingResult {
row_sample_collector,
}
}
fn into_proto(self) -> tipb::AnalyzeColumnsResp {
let pb_collector = self.row_sample_collector.into_proto();
let mut res = tipb::AnalyzeColumnsResp::default();
res.set_row_collector(pb_collector);
res
}
}
#[derive(Default)]
struct AnalyzeColumnsResult {
sample_collectors: Vec<SampleCollector>,
pk_hist: Histogram,
}
impl AnalyzeColumnsResult {
fn new(sample_collectors: Vec<SampleCollector>, pk_hist: Histogram) -> AnalyzeColumnsResult {
AnalyzeColumnsResult {
sample_collectors,
pk_hist,
}
}
fn into_proto(self) -> tipb::AnalyzeColumnsResp {
let hist = self.pk_hist.into_proto();
let cols: Vec<tipb::SampleCollector> = self
.sample_collectors
.into_iter()
.map(|col| col.into_proto())
.collect();
let mut res = tipb::AnalyzeColumnsResp::default();
res.set_collectors(cols.into());
res.set_pk_hist(hist);
res
}
}
#[derive(Default)]
struct AnalyzeIndexResult {
hist: Histogram,
cms: Option<CmSketch>,
fms: Option<FmSketch>,
}
impl AnalyzeIndexResult {
fn new(hist: Histogram, cms: Option<CmSketch>, fms: Option<FmSketch>) -> AnalyzeIndexResult {
AnalyzeIndexResult { hist, cms, fms }
}
fn into_proto(self) -> tipb::AnalyzeIndexResp {
let mut res = tipb::AnalyzeIndexResp::default();
res.set_hist(self.hist.into_proto());
if let Some(c) = self.cms {
res.set_cms(c.into_proto());
}
if let Some(f) = self.fms {
let mut s = tipb::SampleCollector::default();
s.set_fm_sketch(f.into_proto());
res.set_collector(s);
}
res
}
}
#[derive(Default)]
struct AnalyzeMixedResult {
col_res: AnalyzeColumnsResult,
idx_res: AnalyzeIndexResult,
}
impl AnalyzeMixedResult {
fn new(col_res: AnalyzeColumnsResult, idx_res: AnalyzeIndexResult) -> AnalyzeMixedResult {
AnalyzeMixedResult { col_res, idx_res }
}
fn into_proto(self) -> tipb::AnalyzeMixedResp {
let mut res = tipb::AnalyzeMixedResp::default();
res.set_index_resp(self.idx_res.into_proto());
res.set_columns_resp(self.col_res.into_proto());
res
}
}
#[cfg(test)]
mod tests {
use super::*;
use ::std::collections::HashMap;
use tidb_query_datatype::codec::datum;
use tidb_query_datatype::codec::datum::Datum;
#[test]
fn test_sample_collector() {
let max_sample_size = 3;
let max_fm_sketch_size = 10;
let cm_sketch_depth = 2;
let cm_sketch_width = 16;
let mut sample = SampleCollector::new(
max_sample_size,
max_fm_sketch_size,
cm_sketch_depth,
cm_sketch_width,
);
let cases = vec![Datum::I64(1), Datum::Null, Datum::I64(2), Datum::I64(5)];
for data in cases {
sample.collect(datum::encode_value(&mut EvalContext::default(), &[data]).unwrap());
}
assert_eq!(sample.samples.len(), max_sample_size);
assert_eq!(sample.null_count, 1);
assert_eq!(sample.count, 3);
assert_eq!(sample.cm_sketch.unwrap().count(), 3);
assert_eq!(sample.total_size, 6)
}
#[test]
fn test_row_sample_collector() {
let sample_num = 20;
let row_num = 100;
let loop_cnt = 1000;
let mut item_cnt: HashMap<Vec<u8>, usize> = HashMap::new();
let mut nums: Vec<Vec<u8>> = Vec::with_capacity(row_num);
for i in 0..row_num {
nums.push(
datum::encode_value(&mut EvalContext::default(), &[Datum::I64(i as i64)]).unwrap(),
);
}
for _loop_i in 0..loop_cnt {
let mut collector = RowSampleCollector::new(sample_num, 1000, 1);
for row in &nums {
collector.sampling([row.clone()].to_vec());
}
assert_eq!(collector.samples.len(), sample_num);
for sample in collector.samples.into_vec() {
*item_cnt.entry(sample.0.1[0].clone()).or_insert(0) += 1;
}
}
let exp_freq = sample_num as f64 * loop_cnt as f64 / row_num as f64;
let delta = 0.5;
for (_, v) in item_cnt.into_iter() {
assert!(
v as f64 >= exp_freq / (1.0 + delta) && v as f64 <= exp_freq * (1.0 + delta),
"v: {}",
v
);
}
}
}