use std::cell::Cell;
use std::ffi::CString;
use std::mem;
use std::result::Result;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use engine_rocks::raw::{
new_compaction_filter_raw, CompactionFilter, CompactionFilterContext, CompactionFilterDecision,
CompactionFilterFactory, CompactionFilterValueType, DBCompactionFilter,
};
use engine_rocks::{
RocksEngine, RocksMvccProperties, RocksUserCollectedPropertiesNoRc, RocksWriteBatch,
};
use engine_traits::{
KvEngine, MiscExt, Mutable, MvccProperties, WriteBatch, WriteBatchExt, WriteOptions,
};
use file_system::{IOType, WithIOType};
use pd_client::{Feature, FeatureGate};
use prometheus::{local::*, *};
use raftstore::coprocessor::RegionInfoProvider;
use tikv_util::worker::FutureScheduler;
use txn_types::{Key, TimeStamp, WriteRef, WriteType};
use crate::server::gc_worker::{GcConfig, GcTask, GcWorkerConfigManager};
use crate::storage::mvcc::{GC_DELETE_VERSIONS_HISTOGRAM, MVCC_VERSIONS_HISTOGRAM};
const DEFAULT_DELETE_BATCH_SIZE: usize = 256 * 1024;
const DEFAULT_DELETE_BATCH_COUNT: usize = 128;
const COMPACTION_FILTER_GC_FEATURE: Feature = Feature::require(5, 0, 0);
struct GcContext {
db: RocksEngine,
store_id: u64,
safe_point: Arc<AtomicU64>,
cfg_tracker: GcWorkerConfigManager,
feature_gate: FeatureGate,
gc_scheduler: FutureScheduler<GcTask>,
region_info_provider: Arc<dyn RegionInfoProvider + 'static>,
#[cfg(any(test, feature = "failpoints"))]
callbacks_on_drop: Vec<Arc<dyn Fn(&WriteCompactionFilter) + Send + Sync>>,
}
static ORPHAN_VERSIONS_ID: AtomicUsize = AtomicUsize::new(0);
lazy_static! {
static ref GC_CONTEXT: Mutex<Option<GcContext>> = Mutex::new(None);
static ref GC_COMPACTION_FILTERED: IntCounter = register_int_counter!(
"tikv_gc_compaction_filtered",
"Filtered versions by compaction"
)
.unwrap();
static ref GC_COMPACTION_FAILURE: IntCounter = register_int_counter!(
"tikv_gc_compaction_failure",
"Compaction filter meets failure"
)
.unwrap();
static ref GC_COMPACTION_FILTER_SKIP: IntCounter = register_int_counter!(
"tikv_gc_compaction_filter_skip",
"Skip to create compaction filter for GC because of table properties"
)
.unwrap();
static ref GC_COMPACTION_FILTER_PERFORM: IntCounter = register_int_counter!(
"tikv_gc_compaction_filter_perform",
"perfrom GC in compaction filter"
)
.unwrap();
static ref GC_COMPACTION_MVCC_ROLLBACK: IntCounter = register_int_counter!(
"tikv_gc_compaction_mvcc_rollback",
"Compaction of mvcc rollbacks"
)
.unwrap();
pub static ref GC_COMPACTION_FILTER_ORPHAN_VERSIONS: IntCounterVec = register_int_counter_vec!(
"tikv_gc_compaction_filter_orphan_versions",
"Compaction filter orphan versions for default CF",
&["tag"]
).unwrap();
}
pub trait CompactionFilterInitializer {
fn init_compaction_filter(
&self,
store_id: u64,
safe_point: Arc<AtomicU64>,
cfg_tracker: GcWorkerConfigManager,
feature_gate: FeatureGate,
gc_scheduler: FutureScheduler<GcTask>,
region_info_provider: Arc<dyn RegionInfoProvider>,
);
}
impl<EK> CompactionFilterInitializer for EK
where
EK: KvEngine,
{
default fn init_compaction_filter(
&self,
_store_id: u64,
_safe_point: Arc<AtomicU64>,
_cfg_tracker: GcWorkerConfigManager,
_feature_gate: FeatureGate,
_gc_scheduler: FutureScheduler<GcTask>,
_region_info_provider: Arc<dyn RegionInfoProvider>,
) {
info!("Compaction filter is not supported for this engine.");
}
}
impl CompactionFilterInitializer for RocksEngine {
fn init_compaction_filter(
&self,
store_id: u64,
safe_point: Arc<AtomicU64>,
cfg_tracker: GcWorkerConfigManager,
feature_gate: FeatureGate,
gc_scheduler: FutureScheduler<GcTask>,
region_info_provider: Arc<dyn RegionInfoProvider>,
) {
info!("initialize GC context for compaction filter");
let mut gc_context = GC_CONTEXT.lock().unwrap();
*gc_context = Some(GcContext {
db: self.clone(),
store_id,
safe_point,
cfg_tracker,
feature_gate,
gc_scheduler,
region_info_provider,
#[cfg(any(test, feature = "failpoints"))]
callbacks_on_drop: vec![],
});
}
}
pub struct WriteCompactionFilterFactory;
impl CompactionFilterFactory for WriteCompactionFilterFactory {
fn create_compaction_filter(
&self,
context: &CompactionFilterContext,
) -> *mut DBCompactionFilter {
let gc_context_option = GC_CONTEXT.lock().unwrap();
let gc_context = match *gc_context_option {
Some(ref ctx) => ctx,
None => return std::ptr::null_mut(),
};
let safe_point = gc_context.safe_point.load(Ordering::Relaxed);
if safe_point == 0 {
debug!("skip gc in compaction filter because of no safe point");
return std::ptr::null_mut();
}
let (enable, skip_vcheck, ratio_threshold) = {
let value = &*gc_context.cfg_tracker.value();
(
value.enable_compaction_filter,
value.compaction_filter_skip_version_check,
value.ratio_threshold,
)
};
let db = gc_context.db.clone();
let gc_scheduler = gc_context.gc_scheduler.clone();
let store_id = gc_context.store_id;
let region_info_provider = gc_context.region_info_provider.clone();
debug!(
"creating compaction filter"; "feature_enable" => enable,
"skip_version_check" => skip_vcheck,
"ratio_threshold" => ratio_threshold,
);
if db.is_stalled_or_stopped() {
debug!("skip gc in compaction filter because the DB is stalled");
return std::ptr::null_mut();
}
if !do_check_allowed(enable, skip_vcheck, &gc_context.feature_gate) {
debug!("skip gc in compaction filter because it's not allowed");
return std::ptr::null_mut();
}
drop(gc_context_option);
GC_COMPACTION_FILTER_PERFORM.inc();
if !check_need_gc(safe_point.into(), ratio_threshold, context) {
debug!("skip gc in compaction filter because it's not necessary");
GC_COMPACTION_FILTER_SKIP.inc();
return std::ptr::null_mut();
}
debug!(
"gc in compaction filter"; "safe_point" => safe_point,
"files" => ?context.file_numbers(),
"bottommost" => context.is_bottommost_level(),
"manual" => context.is_manual_compaction(),
);
let filter = Box::new(WriteCompactionFilter::new(
db,
safe_point,
context,
gc_scheduler,
(store_id, region_info_provider),
));
let name = CString::new("write_compaction_filter").unwrap();
unsafe { new_compaction_filter_raw(name, filter) }
}
}
struct WriteCompactionFilter {
safe_point: u64,
engine: RocksEngine,
is_bottommost_level: bool,
encountered_errors: bool,
write_batch: RocksWriteBatch,
gc_scheduler: FutureScheduler<GcTask>,
mvcc_deletions: Vec<Key>,
mvcc_deletion_overlaps: Option<usize>,
regions_provider: (u64, Arc<dyn RegionInfoProvider>),
mvcc_key_prefix: Vec<u8>,
remove_older: bool,
versions: usize,
filtered: usize,
total_versions: usize,
total_filtered: usize,
mvcc_rollback_and_locks: usize,
orphan_versions: usize,
versions_hist: LocalHistogram,
filtered_hist: LocalHistogram,
#[cfg(any(test, feature = "failpoints"))]
callbacks_on_drop: Vec<Arc<dyn Fn(&WriteCompactionFilter) + Send + Sync>>,
}
impl WriteCompactionFilter {
fn new(
engine: RocksEngine,
safe_point: u64,
context: &CompactionFilterContext,
gc_scheduler: FutureScheduler<GcTask>,
regions_provider: (u64, Arc<dyn RegionInfoProvider>),
) -> Self {
assert!(safe_point > 0);
debug!("gc in compaction filter"; "safe_point" => safe_point);
let write_batch = engine.write_batch_with_cap(DEFAULT_DELETE_BATCH_SIZE);
WriteCompactionFilter {
safe_point,
engine,
is_bottommost_level: context.is_bottommost_level(),
encountered_errors: false,
write_batch,
gc_scheduler,
mvcc_deletions: Vec::with_capacity(DEFAULT_DELETE_BATCH_COUNT),
mvcc_deletion_overlaps: None,
regions_provider,
mvcc_key_prefix: vec![],
remove_older: false,
versions: 0,
filtered: 0,
total_versions: 0,
total_filtered: 0,
mvcc_rollback_and_locks: 0,
orphan_versions: 0,
versions_hist: MVCC_VERSIONS_HISTOGRAM.local(),
filtered_hist: GC_DELETE_VERSIONS_HISTOGRAM.local(),
#[cfg(any(test, feature = "failpoints"))]
callbacks_on_drop: {
let ctx = GC_CONTEXT.lock().unwrap();
ctx.as_ref().unwrap().callbacks_on_drop.clone()
},
}
}
fn schedule_gc_task(&self, task: GcTask, log_on_error: bool) {
if let Err(e) = self.gc_scheduler.schedule(task) {
if log_on_error {
error!("compaction filter schedule {} fail", e.0);
}
}
}
fn handle_bottommost_delete(&mut self) {
debug_assert_eq!(self.mvcc_key_prefix[0], keys::DATA_PREFIX);
let key = Key::from_encoded_slice(&self.mvcc_key_prefix[1..]);
self.mvcc_deletions.push(key);
}
fn gc_mvcc_deletions(&mut self) {
if !self.mvcc_deletions.is_empty() {
let empty = Vec::with_capacity(DEFAULT_DELETE_BATCH_COUNT);
let task = GcTask::GcKeys {
keys: mem::replace(&mut self.mvcc_deletions, empty),
safe_point: self.safe_point.into(),
store_id: self.regions_provider.0,
region_info_provider: self.regions_provider.1.clone(),
};
self.schedule_gc_task(task, false);
}
}
fn do_filter(
&mut self,
_start_level: usize,
key: &[u8],
_sequence: u64,
value: &[u8],
value_type: CompactionFilterValueType,
) -> Result<CompactionFilterDecision, String> {
let (mvcc_key_prefix, commit_ts) = split_ts(key)?;
if commit_ts > self.safe_point || value_type != CompactionFilterValueType::Value {
return Ok(CompactionFilterDecision::Keep);
}
self.versions += 1;
if self.mvcc_key_prefix != mvcc_key_prefix {
if self.mvcc_deletion_overlaps.take() == Some(0) {
self.handle_bottommost_delete();
if self.mvcc_deletions.len() >= DEFAULT_DELETE_BATCH_COUNT {
self.gc_mvcc_deletions();
}
}
self.switch_key_metrics();
self.mvcc_key_prefix.clear();
self.mvcc_key_prefix.extend_from_slice(mvcc_key_prefix);
self.remove_older = false;
} else if let Some(ref mut overlaps) = self.mvcc_deletion_overlaps {
*overlaps += 1;
}
let mut filtered = self.remove_older;
let write = parse_write(value)?;
if !self.remove_older {
match write.write_type {
WriteType::Rollback | WriteType::Lock => {
self.mvcc_rollback_and_locks += 1;
filtered = true;
}
WriteType::Put => self.remove_older = true,
WriteType::Delete => {
self.remove_older = true;
if self.is_bottommost_level {
self.mvcc_deletion_overlaps = Some(0);
}
}
}
}
if !filtered {
return Ok(CompactionFilterDecision::Keep);
}
self.filtered += 1;
self.handle_filtered_write(write)?;
self.flush_pending_writes_if_need(false )?;
let decision = if self.remove_older {
debug_assert!(commit_ts > 0);
let prefix = Key::from_encoded_slice(mvcc_key_prefix);
let skip_until = prefix.append_ts((commit_ts - 1).into()).into_encoded();
CompactionFilterDecision::RemoveAndSkipUntil(skip_until)
} else {
CompactionFilterDecision::Remove
};
Ok(decision)
}
fn handle_filtered_write(&mut self, write: WriteRef) -> Result<(), String> {
if write.short_value.is_none() && write.write_type == WriteType::Put {
let prefix = Key::from_encoded_slice(&self.mvcc_key_prefix);
let def_key = prefix.append_ts(write.start_ts).into_encoded();
self.write_batch.delete(&def_key)?;
}
Ok(())
}
fn flush_pending_writes_if_need(&mut self, force: bool) -> Result<(), engine_traits::Error> {
if self.write_batch.is_empty() {
return Ok(());
}
fn do_flush(
wb: &RocksWriteBatch,
wopts: &WriteOptions,
) -> Result<(), engine_traits::Error> {
let _io_type_guard = WithIOType::new(IOType::Gc);
fail_point!("write_compaction_filter_flush_write_batch", true, |_| {
Err(engine_traits::Error::Engine(
"Ingested fail point".to_string(),
))
});
wb.write_opt(wopts)
}
if self.write_batch.count() > DEFAULT_DELETE_BATCH_COUNT || force {
let mut wopts = WriteOptions::default();
wopts.set_no_slowdown(true);
if let Err(e) = do_flush(&self.write_batch, &wopts) {
let wb = mem::replace(
&mut self.write_batch,
self.engine.write_batch_with_cap(DEFAULT_DELETE_BATCH_SIZE),
);
self.orphan_versions += wb.count();
let id = ORPHAN_VERSIONS_ID.fetch_add(1, Ordering::Relaxed);
let task = GcTask::OrphanVersions { wb, id };
warn!(
"compaction filter flush fail, dispatch to gc worker";
"task" => %task, "err" => ?e,
);
self.schedule_gc_task(task, true);
return Err(e);
}
self.write_batch.clear();
}
Ok(())
}
fn switch_key_metrics(&mut self) {
if self.versions != 0 {
self.versions_hist.observe(self.versions as f64);
self.total_versions += self.versions;
self.versions = 0;
}
if self.filtered != 0 {
self.filtered_hist.observe(self.filtered as f64);
self.total_filtered += self.filtered;
self.filtered = 0;
}
}
fn flush_metrics(&self) {
GC_COMPACTION_FILTERED.inc_by(self.total_filtered as i64);
GC_COMPACTION_MVCC_ROLLBACK.inc_by(self.mvcc_rollback_and_locks as i64);
GC_COMPACTION_FILTER_ORPHAN_VERSIONS
.with_label_values(&["generated"])
.inc_by(self.orphan_versions as i64);
if let Some((versions, filtered)) = STATS.with(|stats| {
stats.versions.update(|x| x + self.total_versions);
stats.filtered.update(|x| x + self.total_filtered);
if stats.need_report() {
return Some(stats.prepare_report());
}
None
}) {
if filtered > 0 {
info!("Compaction filter reports"; "total" => versions, "filtered" => filtered);
}
}
}
}
struct CompactionFilterStats {
versions: Cell<usize>,
filtered: Cell<usize>,
last_report: Cell<Instant>,
}
impl CompactionFilterStats {
fn need_report(&self) -> bool {
self.versions.get() >= 1024 * 1024
|| self.last_report.get().elapsed() >= Duration::from_secs(60)
}
fn prepare_report(&self) -> (usize, usize) {
let versions = self.versions.replace(0);
let filtered = self.filtered.replace(0);
self.last_report.set(Instant::now());
(versions, filtered)
}
}
impl Default for CompactionFilterStats {
fn default() -> Self {
CompactionFilterStats {
versions: Cell::new(0),
filtered: Cell::new(0),
last_report: Cell::new(Instant::now()),
}
}
}
thread_local! {
static STATS: CompactionFilterStats = CompactionFilterStats::default();
}
impl Drop for WriteCompactionFilter {
fn drop(&mut self) {
if self.mvcc_deletion_overlaps.take() == Some(0) {
self.handle_bottommost_delete();
}
self.gc_mvcc_deletions();
if let Err(e) = self.flush_pending_writes_if_need(true) {
error!("compaction filter flush writes fail"; "err" => ?e);
}
self.engine.sync_wal().unwrap();
self.switch_key_metrics();
self.flush_metrics();
#[cfg(any(test, feature = "failpoints"))]
for callback in &self.callbacks_on_drop {
callback(&self);
}
}
}
impl CompactionFilter for WriteCompactionFilter {
fn featured_filter(
&mut self,
level: usize,
key: &[u8],
sequence: u64,
value: &[u8],
value_type: CompactionFilterValueType,
) -> CompactionFilterDecision {
if self.encountered_errors {
return CompactionFilterDecision::Keep;
}
match self.do_filter(level, key, sequence, value, value_type) {
Ok(decision) => decision,
Err(e) => {
warn!("compaction filter meet error: {}", e);
GC_COMPACTION_FAILURE.inc();
self.encountered_errors = true;
CompactionFilterDecision::Keep
}
}
}
}
fn split_ts(key: &[u8]) -> Result<(&[u8], u64), String> {
match Key::split_on_ts_for(key) {
Ok((key, ts)) => Ok((key, ts.into_inner())),
Err(_) => Err(format!(
"invalid write cf key: {}",
log_wrappers::Value(key)
)),
}
}
fn parse_write(value: &[u8]) -> Result<WriteRef, String> {
match WriteRef::parse(value) {
Ok(write) => Ok(write),
Err(_) => Err(format!(
"invalid write cf value: {}",
log_wrappers::Value(value)
)),
}
}
pub fn is_compaction_filter_allowed(cfg_value: &GcConfig, feature_gate: &FeatureGate) -> bool {
do_check_allowed(
cfg_value.enable_compaction_filter,
cfg_value.compaction_filter_skip_version_check,
feature_gate,
)
}
fn do_check_allowed(enable: bool, skip_vcheck: bool, feature_gate: &FeatureGate) -> bool {
enable && (skip_vcheck || feature_gate.can_enable(COMPACTION_FILTER_GC_FEATURE))
}
fn check_need_gc(
safe_point: TimeStamp,
ratio_threshold: f64,
context: &CompactionFilterContext,
) -> bool {
let check_props = |props: &MvccProperties| -> (bool, bool ) {
if props.min_ts > safe_point {
return (false, false);
}
if ratio_threshold < 1.0 || context.is_bottommost_level() {
return (true, true);
}
if props.num_versions as f64 > props.num_rows as f64 * ratio_threshold {
return (true, false);
}
let num_rollback_and_locks = (props.num_versions - props.num_deletes) as f64;
if num_rollback_and_locks > props.num_puts as f64 * ratio_threshold {
return (true, false);
}
(props.max_row_versions > 1024, false)
};
let (mut sum_props, mut needs_gc) = (MvccProperties::new(), 0);
for i in 0..context.file_numbers().len() {
let table_props = context.table_properties(i);
let user_props = unsafe {
&*(table_props.user_collected_properties() as *const _
as *const RocksUserCollectedPropertiesNoRc)
};
if let Ok(props) = RocksMvccProperties::decode(user_props) {
sum_props.add(&props);
let (sst_needs_gc, skip_more_checks) = check_props(&props);
if sst_needs_gc {
needs_gc += 1;
}
if skip_more_checks {
needs_gc = context.file_numbers().len();
break;
}
}
}
(needs_gc >= ((context.file_numbers().len() + 1) / 2)) || check_props(&sum_props).0
}
#[allow(dead_code)]
#[cfg(any(test, feature = "failpoints"))]
pub mod test_utils {
use super::*;
use crate::storage::kv::RocksEngine as StorageRocksEngine;
use engine_rocks::raw::{CompactOptions, CompactionOptions};
use engine_rocks::util::get_cf_handle;
use engine_rocks::RocksEngine;
use engine_traits::{SyncMutable, CF_WRITE};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use raftstore::coprocessor::region_info_accessor::MockRegionInfoProvider;
use tikv_util::config::VersionTrack;
pub fn gc_by_compact(engine: &StorageRocksEngine, _: &[u8], safe_point: u64) {
let engine = engine.get_rocksdb();
engine.delete_cf("write", b"znot-exists-key").unwrap();
TestGCRunner::new(safe_point).gc(&engine);
}
lazy_static! {
static ref LOCK: Mutex<()> = std::sync::Mutex::new(());
}
pub fn default_compact_options() -> CompactOptions {
let mut compact_opts = CompactOptions::new();
compact_opts.set_exclusive_manual_compaction(false);
compact_opts.set_max_subcompactions(1);
compact_opts
}
pub struct TestGCRunner<'a> {
pub safe_point: u64,
pub ratio_threshold: Option<f64>,
pub start: Option<&'a [u8]>,
pub end: Option<&'a [u8]>,
pub target_level: Option<usize>,
pub gc_scheduler: FutureScheduler<GcTask>,
pub gc_receiver: UnboundedReceiver<Option<GcTask>>,
pub(super) callbacks_on_drop: Vec<Arc<dyn Fn(&WriteCompactionFilter) + Send + Sync>>,
}
impl<'a> TestGCRunner<'a> {
pub fn new(safe_point: u64) -> Self {
let (tx, rx) = unbounded();
TestGCRunner {
safe_point,
ratio_threshold: None,
start: None,
end: None,
target_level: None,
gc_scheduler: FutureScheduler::new("test-gc-sched", tx),
gc_receiver: rx,
callbacks_on_drop: vec![],
}
}
}
impl<'a> TestGCRunner<'a> {
pub fn safe_point(&mut self, sp: u64) -> &mut Self {
self.safe_point = sp;
self
}
fn prepare_gc(&self, engine: &RocksEngine) {
let safe_point = Arc::new(AtomicU64::new(self.safe_point));
let cfg_tracker = {
let mut cfg = GcConfig::default();
if let Some(ratio_threshold) = self.ratio_threshold {
cfg.ratio_threshold = ratio_threshold;
}
cfg.enable_compaction_filter = true;
GcWorkerConfigManager(Arc::new(VersionTrack::new(cfg)))
};
let feature_gate = {
let feature_gate = FeatureGate::default();
feature_gate.set_version("5.0.0").unwrap();
feature_gate
};
let mut gc_context_opt = GC_CONTEXT.lock().unwrap();
*gc_context_opt = Some(GcContext {
db: engine.clone(),
store_id: 1,
safe_point,
cfg_tracker,
feature_gate,
gc_scheduler: self.gc_scheduler.clone(),
region_info_provider: Arc::new(MockRegionInfoProvider::new(vec![])),
callbacks_on_drop: self.callbacks_on_drop.clone(),
});
}
fn post_gc(&mut self) {
self.callbacks_on_drop.clear();
let mut gc_context = GC_CONTEXT.lock().unwrap();
let callbacks = &mut gc_context.as_mut().unwrap().callbacks_on_drop;
callbacks.clear();
}
pub fn gc(&mut self, engine: &RocksEngine) {
let _guard = LOCK.lock().unwrap();
self.prepare_gc(engine);
let db = engine.as_inner();
let handle = get_cf_handle(db, CF_WRITE).unwrap();
let mut compact_opts = default_compact_options();
if let Some(target_level) = self.target_level {
compact_opts.set_change_level(true);
compact_opts.set_target_level(target_level as i32);
}
db.compact_range_cf_opt(handle, &compact_opts, self.start, self.end);
self.post_gc();
}
pub fn gc_on_files(&mut self, engine: &RocksEngine, input_files: &[String]) {
let _guard = LOCK.lock().unwrap();
self.prepare_gc(engine);
let db = engine.as_inner();
let handle = get_cf_handle(db, CF_WRITE).unwrap();
let level = self.target_level.unwrap() as i32;
db.compact_files_cf(handle, &CompactionOptions::new(), input_files, level)
.unwrap();
self.post_gc();
}
}
pub fn rocksdb_level_files(engine: &RocksEngine, cf: &str) -> Vec<Vec<String>> {
let cf_handle = get_cf_handle(engine.as_inner(), cf).unwrap();
let metadata = engine.as_inner().get_column_family_meta_data(cf_handle);
let mut res = Vec::with_capacity(7);
for level_meta in metadata.get_levels() {
let mut level = Vec::new();
for meta in level_meta.get_files() {
level.push(meta.get_name());
}
res.push(level);
}
res
}
pub fn rocksdb_level_file_counts(engine: &RocksEngine, cf: &str) -> Vec<usize> {
let cf_handle = get_cf_handle(engine.as_inner(), cf).unwrap();
let metadata = engine.as_inner().get_column_family_meta_data(cf_handle);
let mut res = Vec::with_capacity(7);
for level_meta in metadata.get_levels() {
res.push(level_meta.get_files().len());
}
res
}
}
#[cfg(test)]
pub mod tests {
use super::test_utils::*;
use super::*;
use crate::config::DbConfig;
use crate::storage::kv::TestEngineBuilder;
use crate::storage::mvcc::tests::{must_get, must_get_none};
use crate::storage::txn::tests::{must_commit, must_prewrite_delete, must_prewrite_put};
use engine_traits::{DeleteStrategy, MiscExt, Peekable, Range, SyncMutable, CF_WRITE};
#[test]
fn test_is_compaction_filter_allowed() {
let gate = FeatureGate::default();
gate.set_version("4.1.0").unwrap();
let mut cfg_value = GcConfig::default();
assert!(!is_compaction_filter_allowed(&cfg_value, &gate));
cfg_value.enable_compaction_filter = true;
assert!(!is_compaction_filter_allowed(&cfg_value, &gate));
cfg_value.compaction_filter_skip_version_check = true;
assert!(is_compaction_filter_allowed(&cfg_value, &gate));
gate.set_version("5.0.0").unwrap();
cfg_value.compaction_filter_skip_version_check = false;
assert!(is_compaction_filter_allowed(&cfg_value, &gate));
}
#[test]
fn test_compaction_filter_basic() {
let engine = TestEngineBuilder::new().build().unwrap();
let raw_engine = engine.get_rocksdb();
let value = vec![b'v'; 512];
let mut gc_runner = TestGCRunner::new(0);
must_prewrite_put(&engine, b"zkey", &value, b"zkey", 100);
must_commit(&engine, b"zkey", 100, 110);
gc_runner.safe_point(50).gc(&raw_engine);
must_get(&engine, b"zkey", 110, &value);
gc_runner.safe_point(200).gc(&raw_engine);
must_get(&engine, b"zkey", 110, &value);
must_prewrite_put(&engine, b"zkey", &value, b"zkey", 120);
must_commit(&engine, b"zkey", 120, 130);
gc_runner.safe_point(115).gc(&raw_engine);
must_get(&engine, b"zkey", 110, &value);
gc_runner.safe_point(200).gc(&raw_engine);
must_get_none(&engine, b"zkey", 110);
let default_key = Key::from_encoded_slice(b"zkey").append_ts(100.into());
let default_key = default_key.into_encoded();
assert!(raw_engine.get_value(&default_key).unwrap().is_none());
}
#[test]
fn test_compaction_filter_handle_deleting() {
let value = vec![b'v'; 512];
let engine = TestEngineBuilder::new().build().unwrap();
let raw_engine = engine.get_rocksdb();
let mut gc_runner = TestGCRunner::new(0);
let mut gc_and_check = |expect_tasks: bool, prefix: &[u8]| {
gc_runner.safe_point(500).gc(&raw_engine);
if let Ok(Some(task)) = gc_runner.gc_receiver.try_next() {
assert!(expect_tasks, "a GC task is expected");
match task.unwrap() {
GcTask::GcKeys { keys, .. } => {
assert_eq!(keys.len(), 1);
let got = keys[0].as_encoded();
let expect = Key::from_raw(prefix);
assert_eq!(got, &expect.as_encoded()[1..]);
}
_ => unreachable!(),
}
return;
}
assert!(!expect_tasks, "no GC task is expected");
};
must_prewrite_put(&engine, b"zkey", &value, b"zkey", 100);
must_commit(&engine, b"zkey", 100, 110);
must_prewrite_delete(&engine, b"zkey", b"zkey", 120);
must_commit(&engine, b"zkey", 120, 130);
gc_and_check(false, b"zkey");
gc_and_check(true, b"zkey");
raw_engine
.delete_ranges_cf(
CF_WRITE,
DeleteStrategy::DeleteFiles,
&[Range::new(b"z", b"zz")],
)
.unwrap();
must_prewrite_put(&engine, b"zkey1", &value, b"zkey1", 200);
must_commit(&engine, b"zkey1", 200, 210);
must_prewrite_delete(&engine, b"zkey1", b"zkey1", 220);
must_commit(&engine, b"zkey1", 220, 230);
must_prewrite_put(&engine, b"zkey2", &value, b"zkey2", 220);
must_commit(&engine, b"zkey2", 220, 230);
gc_and_check(false, b"zkey1");
gc_and_check(true, b"zkey1");
}
#[test]
fn test_mvcc_properties() {
let mut cfg = DbConfig::default();
cfg.writecf.disable_auto_compactions = true;
cfg.writecf.dynamic_level_bytes = false;
let dir = tempfile::TempDir::new().unwrap();
let builder = TestEngineBuilder::new().path(dir.path());
let engine = builder.build_with_cfg(&cfg).unwrap();
let raw_engine = engine.get_rocksdb();
let value = vec![b'v'; 512];
let mut gc_runner = TestGCRunner::new(0);
for start_ts in &[100, 110, 120, 130] {
must_prewrite_put(&engine, b"zkey", &value, b"zkey", *start_ts);
must_commit(&engine, b"zkey", *start_ts, *start_ts + 5);
}
must_prewrite_delete(&engine, b"zkey", b"zkey", 140);
must_commit(&engine, b"zkey", 140, 145);
gc_runner
.callbacks_on_drop
.push(Arc::new(|_: &WriteCompactionFilter| {
unreachable!();
}));
gc_runner.target_level = Some(6);
gc_runner.safe_point(100).gc(&raw_engine);
gc_runner.ratio_threshold = Some(10.0);
gc_runner.target_level = Some(6);
gc_runner.safe_point(140).gc(&raw_engine);
for commit_ts in &[105, 115, 125] {
must_get_none(&engine, b"zkey", commit_ts);
}
must_prewrite_put(&engine, b"zkey1", &value, b"zkey1", 200);
must_commit(&engine, b"zkey1", 200, 205);
for start_ts in &[200, 210, 220, 230] {
must_prewrite_put(&engine, b"zkey", &value, b"zkey", *start_ts);
must_commit(&engine, b"zkey", *start_ts, *start_ts + 5);
}
must_prewrite_delete(&engine, b"zkey", b"zkey", 240);
must_commit(&engine, b"zkey", 240, 245);
raw_engine.flush_cf(CF_WRITE, true).unwrap();
let level_files = rocksdb_level_files(&raw_engine, CF_WRITE);
let l0_file = dir.path().join(&level_files[0][0]);
let files = &[l0_file.to_str().unwrap().to_owned()];
gc_runner.target_level = Some(5);
gc_runner.ratio_threshold = Some(10.0);
gc_runner.safe_point(300).gc_on_files(&raw_engine, files);
for commit_ts in &[205, 215, 225, 235] {
must_get(&engine, b"zkey", commit_ts, &value);
}
}
#[test]
fn test_remove_and_skip_until() {
let mut cfg = DbConfig::default();
cfg.writecf.disable_auto_compactions = true;
cfg.writecf.dynamic_level_bytes = false;
let dir = tempfile::TempDir::new().unwrap();
let builder = TestEngineBuilder::new().path(dir.path());
let engine = builder.build_with_cfg(&cfg).unwrap();
let raw_engine = engine.get_rocksdb();
let mut gc_runner = TestGCRunner::new(0);
must_prewrite_put(&engine, b"zkey", b"zvalue", b"zkey", 100);
must_commit(&engine, b"zkey", 100, 110);
gc_runner.target_level = Some(6);
gc_runner.safe_point(50).gc(&raw_engine);
assert_eq!(rocksdb_level_file_counts(&raw_engine, CF_WRITE)[6], 1);
must_prewrite_delete(&engine, b"zkey", b"zkey", 120);
must_commit(&engine, b"zkey", 120, 130);
let k_110 = Key::from_raw(b"zkey").append_ts(110.into()).into_encoded();
raw_engine.delete_cf(CF_WRITE, &k_110).unwrap();
raw_engine.flush_cf(CF_WRITE, true).unwrap();
assert_eq!(rocksdb_level_file_counts(&raw_engine, CF_WRITE)[0], 1);
assert_eq!(rocksdb_level_file_counts(&raw_engine, CF_WRITE)[6], 1);
let level_files = rocksdb_level_files(&raw_engine, CF_WRITE);
let l0_file = dir.path().join(&level_files[0][0]);
let files = &[l0_file.to_str().unwrap().to_owned()];
gc_runner.target_level = Some(5);
gc_runner.safe_point(200).gc_on_files(&raw_engine, files);
assert_eq!(rocksdb_level_file_counts(&raw_engine, CF_WRITE)[5], 1);
assert_eq!(rocksdb_level_file_counts(&raw_engine, CF_WRITE)[6], 1);
must_get_none(&engine, b"zkey", 200);
gc_runner.target_level = Some(6);
gc_runner.safe_point(200).gc(&raw_engine);
must_get_none(&engine, b"zkey", 200);
}
}