use std::collections::VecDeque;
use std::error::Error as StdError;
use std::fmt::{self, Display, Formatter};
use std::time::Instant;
use fail::fail_point;
use thiserror::Error;
use engine_traits::KvEngine;
use engine_traits::CF_WRITE;
use tikv_util::worker::Runnable;
use tikv_util::{box_try, error, info, warn};
use super::metrics::COMPACT_RANGE_CF;
type Key = Vec<u8>;
pub enum Task {
Compact {
cf_name: String,
start_key: Option<Key>,
end_key: Option<Key>,
},
CheckAndCompact {
cf_names: Vec<String>,
ranges: Vec<Key>,
tombstones_num_threshold: u64,
tombstones_percent_threshold: u64,
},
}
impl Display for Task {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match *self {
Task::Compact {
ref cf_name,
ref start_key,
ref end_key,
} => f
.debug_struct("Compact")
.field("cf_name", cf_name)
.field(
"start_key",
&start_key.as_ref().map(|k| log_wrappers::Value::key(k)),
)
.field(
"end_key",
&end_key.as_ref().map(|k| log_wrappers::Value::key(k)),
)
.finish(),
Task::CheckAndCompact {
ref cf_names,
ref ranges,
tombstones_num_threshold,
tombstones_percent_threshold,
} => f
.debug_struct("CheckAndCompact")
.field("cf_names", cf_names)
.field(
"ranges",
&(
ranges.first().as_ref().map(|k| log_wrappers::Value::key(k)),
ranges.last().as_ref().map(|k| log_wrappers::Value::key(k)),
),
)
.field("tombstones_num_threshold", &tombstones_num_threshold)
.field(
"tombstones_percent_threshold",
&tombstones_percent_threshold,
)
.finish(),
}
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("compact failed {0:?}")]
Other(#[from] Box<dyn StdError + Sync + Send>),
}
pub struct Runner<E> {
engine: E,
}
impl<E> Runner<E>
where
E: KvEngine,
{
pub fn new(engine: E) -> Runner<E> {
Runner { engine }
}
pub fn compact_range_cf(
&mut self,
cf_name: &str,
start_key: Option<&[u8]>,
end_key: Option<&[u8]>,
) -> Result<(), Error> {
let timer = Instant::now();
let compact_range_timer = COMPACT_RANGE_CF
.with_label_values(&[cf_name])
.start_coarse_timer();
box_try!(
self.engine
.compact_range(cf_name, start_key, end_key, false, 1 ,)
);
compact_range_timer.observe_duration();
info!(
"compact range finished";
"range_start" => start_key.map(::log_wrappers::Value::key),
"range_end" => end_key.map(::log_wrappers::Value::key),
"cf" => cf_name,
"time_takes" => ?timer.elapsed(),
);
Ok(())
}
}
impl<E> Runnable for Runner<E>
where
E: KvEngine,
{
type Task = Task;
fn run(&mut self, task: Task) {
match task {
Task::Compact {
cf_name,
start_key,
end_key,
} => {
let cf = &cf_name;
if let Err(e) = self.compact_range_cf(cf, start_key.as_deref(), end_key.as_deref())
{
error!("execute compact range failed"; "cf" => cf, "err" => %e);
}
}
Task::CheckAndCompact {
cf_names,
ranges,
tombstones_num_threshold,
tombstones_percent_threshold,
} => match collect_ranges_need_compact(
&self.engine,
ranges,
tombstones_num_threshold,
tombstones_percent_threshold,
) {
Ok(mut ranges) => {
for (start, end) in ranges.drain(..) {
for cf in &cf_names {
if let Err(e) = self.compact_range_cf(cf, Some(&start), Some(&end)) {
error!(
"compact range failed";
"range_start" => log_wrappers::Value::key(&start),
"range_end" => log_wrappers::Value::key(&end),
"cf" => cf,
"err" => %e,
);
}
}
fail_point!("raftstore::compact::CheckAndCompact:AfterCompact");
}
}
Err(e) => warn!("check ranges need reclaim failed"; "err" => %e),
},
}
}
}
fn need_compact(
num_entires: u64,
num_versions: u64,
tombstones_num_threshold: u64,
tombstones_percent_threshold: u64,
) -> bool {
if num_entires <= num_versions {
return false;
}
let estimate_num_del = num_entires - num_versions;
estimate_num_del >= tombstones_num_threshold
&& estimate_num_del * 100 >= tombstones_percent_threshold * num_entires
}
fn collect_ranges_need_compact(
engine: &impl KvEngine,
ranges: Vec<Key>,
tombstones_num_threshold: u64,
tombstones_percent_threshold: u64,
) -> Result<VecDeque<(Key, Key)>, Error> {
let mut ranges_need_compact = VecDeque::new();
let mut compact_start = None;
let mut compact_end = None;
for range in ranges.windows(2) {
if let Some((num_ent, num_ver)) =
box_try!(engine.get_range_entries_and_versions(CF_WRITE, &range[0], &range[1]))
{
if need_compact(
num_ent,
num_ver,
tombstones_num_threshold,
tombstones_percent_threshold,
) {
if compact_start.is_none() {
compact_start = Some(range[0].clone());
}
compact_end = Some(range[1].clone());
continue;
}
}
if compact_start.is_some() {
assert!(compact_end.is_some());
}
if let (Some(cs), Some(ce)) = (compact_start, compact_end) {
ranges_need_compact.push_back((cs, ce));
}
compact_start = None;
compact_end = None;
}
if compact_start.is_some() {
assert!(compact_end.is_some());
}
if let (Some(cs), Some(ce)) = (compact_start, compact_end) {
ranges_need_compact.push_back((cs, ce));
}
Ok(ranges_need_compact)
}
#[cfg(test)]
mod tests {
use std::thread::sleep;
use std::time::Duration;
use engine_test::ctor::{CFOptions, ColumnFamilyOptions, DBOptions};
use engine_test::kv::KvTestEngine;
use engine_test::kv::{new_engine, new_engine_opt};
use engine_traits::{MiscExt, Mutable, SyncMutable, WriteBatch, WriteBatchExt};
use engine_traits::{CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE};
use tempfile::Builder;
use keys::data_key;
use txn_types::{Key, TimeStamp, Write, WriteType};
use super::*;
#[test]
fn test_compact_range() {
let path = Builder::new()
.prefix("compact-range-test")
.tempdir()
.unwrap();
let db = new_engine(path.path().to_str().unwrap(), None, &[CF_DEFAULT], None).unwrap();
let mut runner = Runner::new(db.clone());
let mut wb = db.write_batch();
for i in 0..1000 {
let k = format!("key_{}", i);
wb.put_cf(CF_DEFAULT, k.as_bytes(), b"whatever content")
.unwrap();
}
wb.write().unwrap();
db.flush_cf(CF_DEFAULT, true).unwrap();
let mut wb = db.write_batch();
for i in 0..1000 {
let k = format!("key_{}", i);
wb.put_cf(CF_DEFAULT, k.as_bytes(), b"whatever content")
.unwrap();
}
wb.write().unwrap();
db.flush_cf(CF_DEFAULT, true).unwrap();
let old_sst_files_size = db.get_total_sst_files_size_cf(CF_DEFAULT).unwrap().unwrap();
runner.run(Task::Compact {
cf_name: String::from(CF_DEFAULT),
start_key: None,
end_key: None,
});
sleep(Duration::from_secs(5));
let new_sst_files_size = db.get_total_sst_files_size_cf(CF_DEFAULT).unwrap().unwrap();
assert!(old_sst_files_size > new_sst_files_size);
}
fn mvcc_put(db: &KvTestEngine, k: &[u8], v: &[u8], start_ts: TimeStamp, commit_ts: TimeStamp) {
let k = Key::from_encoded(data_key(k)).append_ts(commit_ts);
let w = Write::new(WriteType::Put, start_ts, Some(v.to_vec()));
db.put_cf(CF_WRITE, k.as_encoded(), &w.as_ref().to_bytes())
.unwrap();
}
fn delete(db: &KvTestEngine, k: &[u8], commit_ts: TimeStamp) {
let k = Key::from_encoded(data_key(k)).append_ts(commit_ts);
db.delete_cf(CF_WRITE, k.as_encoded()).unwrap();
}
fn open_db(path: &str) -> KvTestEngine {
let db_opts = DBOptions::new();
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.set_level_zero_file_num_compaction_trigger(8);
let cfs_opts = vec![
CFOptions::new(CF_DEFAULT, ColumnFamilyOptions::new()),
CFOptions::new(CF_RAFT, ColumnFamilyOptions::new()),
CFOptions::new(CF_LOCK, ColumnFamilyOptions::new()),
CFOptions::new(CF_WRITE, cf_opts),
];
new_engine_opt(path, db_opts, cfs_opts).unwrap()
}
#[test]
fn test_check_space_redundancy() {
let tmp_dir = Builder::new().prefix("test").tempdir().unwrap();
let engine = open_db(tmp_dir.path().to_str().unwrap());
for i in 0..5 {
let (k, v) = (format!("k{}", i), format!("value{}", i));
mvcc_put(&engine, k.as_bytes(), v.as_bytes(), 1.into(), 2.into());
}
engine.flush_cf(CF_WRITE, true).unwrap();
for i in 0..5 {
let k = format!("k{}", i);
delete(&engine, k.as_bytes(), 2.into());
}
engine.flush_cf(CF_WRITE, true).unwrap();
let (start, end) = (data_key(b"k0"), data_key(b"k5"));
let (entries, version) = engine
.get_range_entries_and_versions(CF_WRITE, &start, &end)
.unwrap()
.unwrap();
assert_eq!(entries, 10);
assert_eq!(version, 5);
for i in 5..10 {
let (k, v) = (format!("k{}", i), format!("value{}", i));
mvcc_put(&engine, k.as_bytes(), v.as_bytes(), 1.into(), 2.into());
}
engine.flush_cf(CF_WRITE, true).unwrap();
let (s, e) = (data_key(b"k5"), data_key(b"k9"));
let (entries, version) = engine
.get_range_entries_and_versions(CF_WRITE, &s, &e)
.unwrap()
.unwrap();
assert_eq!(entries, 5);
assert_eq!(version, 5);
let ranges_need_to_compact = collect_ranges_need_compact(
&engine,
vec![data_key(b"k0"), data_key(b"k5"), data_key(b"k9")],
1,
50,
)
.unwrap();
let (s, e) = (data_key(b"k0"), data_key(b"k5"));
let mut expected_ranges = VecDeque::new();
expected_ranges.push_back((s, e));
assert_eq!(ranges_need_to_compact, expected_ranges);
for i in 5..10 {
let k = format!("k{}", i);
delete(&engine, k.as_bytes(), 2.into());
}
engine.flush_cf(CF_WRITE, true).unwrap();
let (s, e) = (data_key(b"k5"), data_key(b"k9"));
let (entries, version) = engine
.get_range_entries_and_versions(CF_WRITE, &s, &e)
.unwrap()
.unwrap();
assert_eq!(entries, 10);
assert_eq!(version, 5);
let ranges_need_to_compact = collect_ranges_need_compact(
&engine,
vec![data_key(b"k0"), data_key(b"k5"), data_key(b"k9")],
1,
50,
)
.unwrap();
let (s, e) = (data_key(b"k0"), data_key(b"k9"));
let mut expected_ranges = VecDeque::new();
expected_ranges.push_back((s, e));
assert_eq!(ranges_need_to_compact, expected_ranges);
}
}