mod backward;
mod forward;
use engine_traits::{CfName, CF_DEFAULT, CF_LOCK, CF_WRITE};
use kvproto::kvrpcpb::{ExtraOp, IsolationLevel};
use txn_types::{Key, TimeStamp, TsSet, Value, Write, WriteRef, WriteType};
use self::backward::BackwardKvScanner;
use self::forward::{
DeltaEntryPolicy, ForwardKvScanner, ForwardScanner, LatestEntryPolicy, LatestKvPolicy,
};
use crate::storage::kv::{
CfStatistics, Cursor, CursorBuilder, Iterator, ScanMode, Snapshot, Statistics,
};
use crate::storage::mvcc::{default_not_found_error, NewerTsCheckState, Result};
use crate::storage::txn::{Result as TxnResult, Scanner as StoreScanner};
pub use self::forward::{test_util, DeltaScanner, EntryScanner};
pub struct ScannerBuilder<S: Snapshot>(ScannerConfig<S>);
impl<S: Snapshot> ScannerBuilder<S> {
pub fn new(snapshot: S, ts: TimeStamp, desc: bool) -> Self {
Self(ScannerConfig::new(snapshot, ts, desc))
}
#[inline]
pub fn fill_cache(mut self, fill_cache: bool) -> Self {
self.0.fill_cache = fill_cache;
self
}
#[inline]
pub fn omit_value(mut self, omit_value: bool) -> Self {
self.0.omit_value = omit_value;
self
}
#[inline]
pub fn isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
self.0.isolation_level = isolation_level;
self
}
#[inline]
pub fn range(mut self, lower_bound: Option<Key>, upper_bound: Option<Key>) -> Self {
self.0.lower_bound = lower_bound;
self.0.upper_bound = upper_bound;
self
}
#[inline]
pub fn bypass_locks(mut self, locks: TsSet) -> Self {
self.0.bypass_locks = locks;
self
}
#[inline]
pub fn hint_min_ts(mut self, min_ts: Option<TimeStamp>) -> Self {
self.0.hint_min_ts = min_ts;
self
}
#[inline]
pub fn hint_max_ts(mut self, max_ts: Option<TimeStamp>) -> Self {
self.0.hint_max_ts = max_ts;
self
}
#[inline]
pub fn check_has_newer_ts_data(mut self, enabled: bool) -> Self {
self.0.check_has_newer_ts_data = enabled;
self
}
pub fn build(mut self) -> Result<Scanner<S>> {
let lock_cursor = self.0.create_cf_cursor(CF_LOCK)?;
let write_cursor = self.0.create_cf_cursor(CF_WRITE)?;
if self.0.desc {
Ok(Scanner::Backward(BackwardKvScanner::new(
self.0,
lock_cursor,
write_cursor,
)))
} else {
Ok(Scanner::Forward(ForwardScanner::new(
self.0,
lock_cursor,
write_cursor,
None,
LatestKvPolicy,
)))
}
}
pub fn build_entry_scanner(
mut self,
after_ts: TimeStamp,
output_delete: bool,
) -> Result<EntryScanner<S>> {
let lock_cursor = self.0.create_cf_cursor(CF_LOCK)?;
let write_cursor = self.0.create_cf_cursor(CF_WRITE)?;
let default_cursor = self.0.create_cf_cursor(CF_DEFAULT)?;
Ok(ForwardScanner::new(
self.0,
lock_cursor,
write_cursor,
Some(default_cursor),
LatestEntryPolicy::new(after_ts, output_delete),
))
}
pub fn build_delta_scanner(
mut self,
from_ts: TimeStamp,
extra_op: ExtraOp,
) -> Result<DeltaScanner<S>> {
let lock_cursor = self.0.create_cf_cursor(CF_LOCK)?;
let write_cursor = self.0.create_cf_cursor(CF_WRITE)?;
let default_cursor = self
.0
.create_cf_cursor_with_scan_mode(CF_DEFAULT, ScanMode::Mixed)?;
Ok(ForwardScanner::new(
self.0,
lock_cursor,
write_cursor,
Some(default_cursor),
DeltaEntryPolicy::new(from_ts, extra_op),
))
}
}
pub enum Scanner<S: Snapshot> {
Forward(ForwardKvScanner<S>),
Backward(BackwardKvScanner<S>),
}
impl<S: Snapshot> StoreScanner for Scanner<S> {
fn next(&mut self) -> TxnResult<Option<(Key, Value)>> {
match self {
Scanner::Forward(scanner) => Ok(scanner.read_next()?),
Scanner::Backward(scanner) => Ok(scanner.read_next()?),
}
}
fn take_statistics(&mut self) -> Statistics {
match self {
Scanner::Forward(scanner) => scanner.take_statistics(),
Scanner::Backward(scanner) => scanner.take_statistics(),
}
}
fn met_newer_ts_data(&self) -> NewerTsCheckState {
match self {
Scanner::Forward(scanner) => scanner.met_newer_ts_data(),
Scanner::Backward(scanner) => scanner.met_newer_ts_data(),
}
}
}
pub struct ScannerConfig<S: Snapshot> {
snapshot: S,
fill_cache: bool,
omit_value: bool,
isolation_level: IsolationLevel,
lower_bound: Option<Key>,
upper_bound: Option<Key>,
hint_min_ts: Option<TimeStamp>,
hint_max_ts: Option<TimeStamp>,
ts: TimeStamp,
desc: bool,
bypass_locks: TsSet,
check_has_newer_ts_data: bool,
}
impl<S: Snapshot> ScannerConfig<S> {
fn new(snapshot: S, ts: TimeStamp, desc: bool) -> Self {
Self {
snapshot,
fill_cache: true,
omit_value: false,
isolation_level: IsolationLevel::Si,
lower_bound: None,
upper_bound: None,
hint_min_ts: None,
hint_max_ts: None,
ts,
desc,
bypass_locks: Default::default(),
check_has_newer_ts_data: false,
}
}
#[inline]
fn scan_mode(&self) -> ScanMode {
if self.desc {
ScanMode::Mixed
} else {
ScanMode::Forward
}
}
#[inline]
fn create_cf_cursor(&mut self, cf: CfName) -> Result<Cursor<S::Iter>> {
self.create_cf_cursor_with_scan_mode(cf, self.scan_mode())
}
#[inline]
fn create_cf_cursor_with_scan_mode(
&mut self,
cf: CfName,
scan_mode: ScanMode,
) -> Result<Cursor<S::Iter>> {
let (lower, upper) = if cf == CF_DEFAULT {
(self.lower_bound.take(), self.upper_bound.take())
} else {
(self.lower_bound.clone(), self.upper_bound.clone())
};
let (hint_min_ts, hint_max_ts) = if cf == CF_WRITE {
(self.hint_min_ts, self.hint_max_ts)
} else {
(None, None)
};
let cursor = CursorBuilder::new(&self.snapshot, cf)
.range(lower, upper)
.fill_cache(self.fill_cache)
.scan_mode(scan_mode)
.hint_min_ts(hint_min_ts)
.hint_max_ts(hint_max_ts)
.build()?;
Ok(cursor)
}
}
fn near_load_data_by_write<I>(
default_cursor: &mut Cursor<I>,
user_key: &Key,
write_start_ts: TimeStamp,
statistics: &mut Statistics,
) -> Result<Value>
where
I: Iterator,
{
let seek_key = user_key.clone().append_ts(write_start_ts);
default_cursor.near_seek(&seek_key, &mut statistics.data)?;
if !default_cursor.valid()?
|| default_cursor.key(&mut statistics.data) != seek_key.as_encoded().as_slice()
{
return Err(default_not_found_error(
user_key.to_raw()?,
"near_load_data_by_write",
));
}
statistics.data.processed_keys += 1;
Ok(default_cursor.value(&mut statistics.data).to_vec())
}
fn near_reverse_load_data_by_write<I>(
default_cursor: &mut Cursor<I>,
user_key: &Key,
write_start_ts: TimeStamp,
statistics: &mut Statistics,
) -> Result<Value>
where
I: Iterator,
{
let seek_key = user_key.clone().append_ts(write_start_ts);
default_cursor.near_seek_for_prev(&seek_key, &mut statistics.data)?;
if !default_cursor.valid()?
|| default_cursor.key(&mut statistics.data) != seek_key.as_encoded().as_slice()
{
return Err(default_not_found_error(
user_key.to_raw()?,
"near_reverse_load_data_by_write",
));
}
statistics.data.processed_keys += 1;
Ok(default_cursor.value(&mut statistics.data).to_vec())
}
pub fn has_data_in_range<S: Snapshot>(
snapshot: S,
cf: CfName,
left: &Key,
right: &Key,
statistic: &mut CfStatistics,
) -> Result<bool> {
let mut cursor = CursorBuilder::new(&snapshot, cf)
.range(None, Some(right.clone()))
.scan_mode(ScanMode::Forward)
.fill_cache(true)
.max_skippable_internal_keys(100)
.build()?;
match cursor.seek(left, statistic) {
Ok(valid) => {
if valid && cursor.key(statistic) < right.as_encoded().as_slice() {
return Ok(true);
}
}
Err(e)
if e.to_string()
.contains("Result incomplete: Too many internal keys skipped") =>
{
return Ok(true);
}
err @ Err(_) => {
err?;
}
}
Ok(false)
}
pub fn seek_for_valid_write<I>(
write_cursor: &mut Cursor<I>,
user_key: &Key,
after_ts: TimeStamp,
gc_fence_limit: TimeStamp,
statistics: &mut Statistics,
) -> Result<Option<Write>>
where
I: Iterator,
{
let mut ret = None;
while write_cursor.valid()?
&& Key::is_user_key_eq(
write_cursor.key(&mut statistics.write),
user_key.as_encoded(),
)
{
let write_ref = WriteRef::parse(write_cursor.value(&mut statistics.write))?;
if !write_ref.check_gc_fence_as_latest_version(gc_fence_limit) {
break;
}
match write_ref.write_type {
WriteType::Put | WriteType::Delete => {
assert_ge!(
after_ts,
Key::decode_ts_from(write_cursor.key(&mut statistics.write))?
);
ret = Some(write_ref.to_owned());
break;
}
WriteType::Lock | WriteType::Rollback => {
write_cursor.next(&mut statistics.write);
}
}
}
Ok(ret)
}
pub fn seek_for_valid_value<I>(
write_cursor: &mut Cursor<I>,
default_cursor: &mut Cursor<I>,
user_key: &Key,
after_ts: TimeStamp,
gc_fence_limit: TimeStamp,
statistics: &mut Statistics,
) -> Result<Option<Value>>
where
I: Iterator,
{
if let Some(write) =
seek_for_valid_write(write_cursor, user_key, after_ts, gc_fence_limit, statistics)?
{
if write.write_type == WriteType::Put {
let value = if let Some(v) = write.short_value {
v
} else {
near_load_data_by_write(default_cursor, user_key, write.start_ts, statistics)?
};
return Ok(Some(value));
}
};
Ok(None)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::kv::SEEK_BOUND;
use crate::storage::kv::{Engine, RocksEngine, TestEngineBuilder};
use crate::storage::mvcc::tests::*;
use crate::storage::mvcc::{Error as MvccError, ErrorInner as MvccErrorInner};
use crate::storage::txn::tests::*;
use crate::storage::txn::{Error as TxnError, ErrorInner as TxnErrorInner};
fn check_scan_result<S: Snapshot>(
mut scanner: Scanner<S>,
expected: &[(Vec<u8>, Option<Vec<u8>>)],
) {
let mut scan_result = Vec::new();
loop {
match scanner.next() {
Ok(None) => break,
Ok(Some((key, value))) => scan_result.push((key.to_raw().unwrap(), Some(value))),
Err(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::KeyIsLocked(mut info),
)))) => scan_result.push((info.take_key(), None)),
e => panic!("got error while scanning: {:?}", e),
}
}
assert_eq!(scan_result, expected);
}
fn test_scan_with_lock_and_write_impl(desc: bool) {
const SCAN_TS: TimeStamp = TimeStamp::new(10);
const PREV_TS: TimeStamp = TimeStamp::new(4);
const POST_TS: TimeStamp = TimeStamp::new(5);
let new_engine = || TestEngineBuilder::new().build().unwrap();
let add_write_at_ts = |commit_ts, engine, key, value| {
must_prewrite_put(engine, key, value, key, commit_ts);
must_commit(engine, key, commit_ts, commit_ts);
};
let add_lock_at_ts = |lock_ts, engine, key| {
must_prewrite_put(engine, key, b"lock", key, lock_ts);
must_locked(engine, key, lock_ts);
};
let test_scanner_result =
move |engine: &RocksEngine, expected_result: Vec<(Vec<u8>, Option<Vec<u8>>)>| {
let snapshot = engine.snapshot(Default::default()).unwrap();
let scanner = ScannerBuilder::new(snapshot, SCAN_TS, desc)
.build()
.unwrap();
check_scan_result(scanner, &expected_result);
};
let desc_map = move |result: Vec<(Vec<u8>, Option<Vec<u8>>)>| {
if desc {
result.into_iter().rev().collect()
} else {
result
}
};
let engine = new_engine();
add_write_at_ts(POST_TS, &engine, b"a", b"a_value");
add_lock_at_ts(PREV_TS, &engine, b"b");
let expected_result = desc_map(vec![
(b"a".to_vec(), Some(b"a_value".to_vec())),
(b"b".to_vec(), None),
]);
test_scanner_result(&engine, expected_result);
let engine = new_engine();
add_write_at_ts(PREV_TS, &engine, b"a", b"a_value");
add_lock_at_ts(POST_TS, &engine, b"a");
let expected_result = vec![(b"a".to_vec(), None)];
test_scanner_result(&engine, expected_result);
let engine = new_engine();
add_lock_at_ts(POST_TS, &engine, b"a");
add_write_at_ts(PREV_TS, &engine, b"b", b"b_value");
let expected_result = desc_map(vec![
(b"a".to_vec(), None),
(b"b".to_vec(), Some(b"b_value".to_vec())),
]);
test_scanner_result(&engine, expected_result);
let engine = new_engine();
add_lock_at_ts(PREV_TS, &engine, b"a");
let expected_result = desc_map(vec![(b"a".to_vec(), None)]);
test_scanner_result(&engine, expected_result);
let engine = new_engine();
add_write_at_ts(PREV_TS, &engine, b"a", b"a_value");
let expected_result = desc_map(vec![(b"a".to_vec(), Some(b"a_value".to_vec()))]);
test_scanner_result(&engine, expected_result);
}
fn test_scan_with_lock_impl(desc: bool) {
let engine = TestEngineBuilder::new().build().unwrap();
for i in 0..5 {
must_prewrite_put(&engine, &[i], &[b'v', i], &[i], 1);
must_commit(&engine, &[i], 1, 2);
must_prewrite_put(&engine, &[i], &[b'v', i], &[i], 10);
must_commit(&engine, &[i], 10, 100);
}
must_acquire_pessimistic_lock(&engine, &[1], &[1], 20, 110);
must_acquire_pessimistic_lock(&engine, &[2], &[2], 50, 110);
must_acquire_pessimistic_lock(&engine, &[3], &[3], 105, 110);
must_prewrite_put(&engine, &[4], b"a", &[4], 105);
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut expected_result = vec![
(vec![0], Some(vec![b'v', 0])),
(vec![1], Some(vec![b'v', 1])),
(vec![2], Some(vec![b'v', 2])),
(vec![3], Some(vec![b'v', 3])),
(vec![4], Some(vec![b'v', 4])),
];
if desc {
expected_result = expected_result.into_iter().rev().collect();
}
let scanner = ScannerBuilder::new(snapshot.clone(), 30.into(), desc)
.build()
.unwrap();
check_scan_result(scanner, &expected_result);
let scanner = ScannerBuilder::new(snapshot.clone(), 70.into(), desc)
.build()
.unwrap();
check_scan_result(scanner, &expected_result);
let scanner = ScannerBuilder::new(snapshot.clone(), 103.into(), desc)
.build()
.unwrap();
check_scan_result(scanner, &expected_result);
if desc {
expected_result[0].1 = None;
} else {
expected_result[4].1 = None;
}
let scanner = ScannerBuilder::new(snapshot, 106.into(), desc)
.build()
.unwrap();
check_scan_result(scanner, &expected_result);
}
#[test]
fn test_scan_with_lock_and_write() {
test_scan_with_lock_and_write_impl(true);
test_scan_with_lock_and_write_impl(false);
}
#[test]
fn test_scan_with_lock() {
test_scan_with_lock_impl(false);
test_scan_with_lock_impl(true);
}
fn test_scan_bypass_locks_impl(desc: bool) {
let engine = TestEngineBuilder::new().build().unwrap();
for i in 0..5 {
must_prewrite_put(&engine, &[i], &[b'v', i], &[i], 10);
must_commit(&engine, &[i], 10, 20);
}
for i in 0..5 {
must_prewrite_put(&engine, &[i], &[b'v', i], &[i], 30 + u64::from(i) * 10);
}
let bypass_locks = TsSet::from_u64s(vec![30, 41, 50]);
let mut expected_result = vec![
(vec![0], Some(vec![b'v', 0])),
(vec![1], None),
(vec![2], Some(vec![b'v', 2])),
(vec![3], None),
(vec![4], Some(vec![b'v', 4])),
];
if desc {
expected_result = expected_result.into_iter().rev().collect();
}
let snapshot = engine.snapshot(Default::default()).unwrap();
let scanner = ScannerBuilder::new(snapshot, 65.into(), desc)
.bypass_locks(bypass_locks)
.build()
.unwrap();
check_scan_result(scanner, &expected_result);
}
#[test]
fn test_scan_bypass_locks() {
test_scan_bypass_locks_impl(false);
test_scan_bypass_locks_impl(true);
}
fn must_met_newer_ts_data<E: Engine>(
engine: &E,
scanner_ts: impl Into<TimeStamp>,
key: &[u8],
value: Option<&[u8]>,
desc: bool,
expected_met_newer_ts_data: bool,
) {
let mut scanner = ScannerBuilder::new(
engine.snapshot(Default::default()).unwrap(),
scanner_ts.into(),
desc,
)
.range(Some(Key::from_raw(key)), None)
.check_has_newer_ts_data(true)
.build()
.unwrap();
let result = scanner.next().unwrap();
if let Some(value) = value {
let (k, v) = result.unwrap();
assert_eq!(k, Key::from_raw(key));
assert_eq!(v, value);
} else {
assert!(result.is_none());
}
let expected = if expected_met_newer_ts_data {
NewerTsCheckState::Met
} else {
NewerTsCheckState::NotMetYet
};
assert_eq!(expected, scanner.met_newer_ts_data());
}
fn test_met_newer_ts_data_impl(deep_write_seek: bool, desc: bool) {
let engine = TestEngineBuilder::new().build().unwrap();
let (key, val1) = (b"foo", b"bar1");
if deep_write_seek {
for i in 0..SEEK_BOUND {
must_prewrite_put(&engine, key, val1, key, i);
must_commit(&engine, key, i, i);
}
}
must_prewrite_put(&engine, key, val1, key, 100);
must_commit(&engine, key, 100, 200);
let (key, val2) = (b"foo", b"bar2");
must_prewrite_put(&engine, key, val2, key, 300);
must_commit(&engine, key, 300, 400);
must_met_newer_ts_data(
&engine,
100,
key,
if deep_write_seek { Some(val1) } else { None },
desc,
true,
);
must_met_newer_ts_data(&engine, 200, key, Some(val1), desc, true);
must_met_newer_ts_data(&engine, 300, key, Some(val1), desc, true);
must_met_newer_ts_data(&engine, 400, key, Some(val2), desc, false);
must_met_newer_ts_data(&engine, 500, key, Some(val2), desc, false);
must_prewrite_lock(&engine, key, key, 600);
must_met_newer_ts_data(&engine, 500, key, Some(val2), desc, true);
must_met_newer_ts_data(&engine, 600, key, Some(val2), desc, true);
}
#[test]
fn test_met_newer_ts_data() {
test_met_newer_ts_data_impl(false, false);
test_met_newer_ts_data_impl(false, true);
test_met_newer_ts_data_impl(true, false);
test_met_newer_ts_data_impl(true, true);
}
}