use std::{borrow::Cow, cmp::Ordering};
use engine_traits::CF_DEFAULT;
use kvproto::kvrpcpb::IsolationLevel;
use txn_types::{Key, Lock, TimeStamp, Value, Write, WriteRef, WriteType};
use super::ScannerConfig;
use crate::storage::kv::{Cursor, Snapshot, Statistics, SEEK_BOUND};
use crate::storage::mvcc::{Error, NewerTsCheckState, Result};
const REVERSE_SEEK_BOUND: u64 = 16;
pub struct BackwardKvScanner<S: Snapshot> {
cfg: ScannerConfig<S>,
lock_cursor: Cursor<S::Iter>,
write_cursor: Cursor<S::Iter>,
default_cursor: Option<Cursor<S::Iter>>,
is_started: bool,
statistics: Statistics,
met_newer_ts_data: NewerTsCheckState,
}
impl<S: Snapshot> BackwardKvScanner<S> {
pub fn new(
cfg: ScannerConfig<S>,
lock_cursor: Cursor<S::Iter>,
write_cursor: Cursor<S::Iter>,
) -> BackwardKvScanner<S> {
BackwardKvScanner {
met_newer_ts_data: if cfg.check_has_newer_ts_data {
NewerTsCheckState::NotMetYet
} else {
NewerTsCheckState::Unknown
},
cfg,
lock_cursor,
write_cursor,
statistics: Statistics::default(),
default_cursor: None,
is_started: false,
}
}
pub fn take_statistics(&mut self) -> Statistics {
std::mem::take(&mut self.statistics)
}
#[inline]
pub fn met_newer_ts_data(&self) -> NewerTsCheckState {
self.met_newer_ts_data
}
pub fn read_next(&mut self) -> Result<Option<(Key, Value)>> {
if !self.is_started {
if self.cfg.upper_bound.is_some() {
self.write_cursor.reverse_seek(
self.cfg.upper_bound.as_ref().unwrap(),
&mut self.statistics.write,
)?;
self.lock_cursor.reverse_seek(
self.cfg.upper_bound.as_ref().unwrap(),
&mut self.statistics.lock,
)?;
} else {
self.write_cursor.seek_to_last(&mut self.statistics.write);
self.lock_cursor.seek_to_last(&mut self.statistics.lock);
}
self.is_started = true;
}
loop {
let (current_user_key, has_write, has_lock) = {
let w_key = if self.write_cursor.valid()? {
Some(self.write_cursor.key(&mut self.statistics.write))
} else {
None
};
let l_key = if self.lock_cursor.valid()? {
Some(self.lock_cursor.key(&mut self.statistics.lock))
} else {
None
};
let res = match (w_key, l_key) {
(None, None) => return Ok(None),
(None, Some(lk)) => (lk, false, true),
(Some(wk), None) => (Key::truncate_ts_for(wk)?, true, false),
(Some(wk), Some(lk)) => {
let write_user_key = Key::truncate_ts_for(wk)?;
match write_user_key.cmp(lk) {
Ordering::Less => {
(lk, false, true)
}
Ordering::Greater => {
(write_user_key, true, false)
}
Ordering::Equal => (write_user_key, true, true),
}
}
};
(Key::from_encoded_slice(res.0), res.1, res.2)
};
let mut result = Ok(None);
let mut met_prev_user_key = false;
let ts = self.cfg.ts;
if has_lock {
match self.cfg.isolation_level {
IsolationLevel::Si => {
let lock = {
let lock_value = self.lock_cursor.value(&mut self.statistics.lock);
Lock::parse(lock_value)?
};
if self.met_newer_ts_data == NewerTsCheckState::NotMetYet {
self.met_newer_ts_data = NewerTsCheckState::Met;
}
result = Lock::check_ts_conflict(
Cow::Owned(lock),
¤t_user_key,
ts,
&self.cfg.bypass_locks,
)
.map(|_| None)
.map_err(Into::into);
if result.is_err() {
self.statistics.lock.processed_keys += 1;
}
}
IsolationLevel::Rc => {}
}
self.lock_cursor.prev(&mut self.statistics.lock);
}
if has_write {
if result.is_ok() {
result = self.reverse_get(¤t_user_key, ts, &mut met_prev_user_key);
}
if !met_prev_user_key {
self.move_write_cursor_to_prev_user_key(¤t_user_key)?;
}
}
if let Some(v) = result? {
self.statistics.write.processed_keys += 1;
return Ok(Some((current_user_key, v)));
}
}
}
#[inline]
fn reverse_get(
&mut self,
user_key: &Key,
ts: TimeStamp,
met_prev_user_key: &mut bool,
) -> Result<Option<Value>> {
assert!(self.write_cursor.valid()?);
let mut last_version = None;
let mut last_checked_commit_ts = TimeStamp::zero();
for i in 0..REVERSE_SEEK_BOUND {
if i > 0 {
self.write_cursor.prev(&mut self.statistics.write);
if !self.write_cursor.valid()? {
return self.handle_last_version(last_version, user_key);
}
}
let mut is_done = false;
{
let current_key = self.write_cursor.key(&mut self.statistics.write);
last_checked_commit_ts = Key::decode_ts_from(current_key)?;
if !Key::is_user_key_eq(current_key, user_key.as_encoded().as_slice()) {
*met_prev_user_key = true;
is_done = true;
} else if last_checked_commit_ts > ts {
is_done = true;
if self.met_newer_ts_data == NewerTsCheckState::NotMetYet {
self.met_newer_ts_data = NewerTsCheckState::Met;
}
}
}
if is_done {
return self.handle_last_version(last_version, user_key);
}
let write = WriteRef::parse(self.write_cursor.value(&mut self.statistics.write))
.map_err(Error::from)?;
match write.write_type {
WriteType::Put | WriteType::Delete => last_version = Some(write.to_owned()),
WriteType::Lock | WriteType::Rollback => {}
}
}
if last_checked_commit_ts == ts {
if self.met_newer_ts_data == NewerTsCheckState::NotMetYet {
self.write_cursor.prev(&mut self.statistics.write);
if self.write_cursor.valid()? {
let current_key = self.write_cursor.key(&mut self.statistics.write);
if Key::is_user_key_eq(current_key, user_key.as_encoded().as_slice()) {
self.met_newer_ts_data = NewerTsCheckState::Met;
} else {
*met_prev_user_key = true;
}
}
}
return self.handle_last_version(last_version, user_key);
}
assert!(ts > last_checked_commit_ts);
let mut use_near_seek = false;
let mut seek_key = user_key.clone();
if self.met_newer_ts_data == NewerTsCheckState::NotMetYet {
seek_key = seek_key.append_ts(TimeStamp::max());
self.write_cursor
.internal_seek(&seek_key, &mut self.statistics.write)?;
assert!(self.write_cursor.valid()?);
seek_key = seek_key.truncate_ts()?;
use_near_seek = true;
let current_key = self.write_cursor.key(&mut self.statistics.write);
debug_assert!(Key::is_user_key_eq(
current_key,
user_key.as_encoded().as_slice()
));
let key_ts = Key::decode_ts_from(current_key)?;
if key_ts > ts {
self.met_newer_ts_data = NewerTsCheckState::Met
}
}
seek_key = seek_key.append_ts(ts);
if use_near_seek {
self.write_cursor
.near_seek(&seek_key, &mut self.statistics.write)?;
} else {
self.write_cursor
.internal_seek(&seek_key, &mut self.statistics.write)?;
}
assert!(self.write_cursor.valid()?);
loop {
let current_ts = {
let current_key = self.write_cursor.key(&mut self.statistics.write);
debug_assert!(Key::is_user_key_eq(
current_key,
user_key.as_encoded().as_slice()
));
Key::decode_ts_from(current_key)?
};
if current_ts <= last_checked_commit_ts {
return self.handle_last_version(last_version, user_key);
}
let write = WriteRef::parse(self.write_cursor.value(&mut self.statistics.write))?;
if !write.check_gc_fence_as_latest_version(self.cfg.ts) {
return Ok(None);
}
match write.write_type {
WriteType::Put => {
let write = write.to_owned();
return Ok(Some(self.reverse_load_data_by_write(write, user_key)?));
}
WriteType::Delete => return Ok(None),
WriteType::Lock | WriteType::Rollback => {
self.write_cursor.next(&mut self.statistics.write);
assert!(self.write_cursor.valid()?);
}
}
}
}
#[inline]
fn handle_last_version(
&mut self,
some_write: Option<Write>,
user_key: &Key,
) -> Result<Option<Value>> {
match some_write {
None => Ok(None),
Some(write) => {
if !write.as_ref().check_gc_fence_as_latest_version(self.cfg.ts) {
return Ok(None);
}
match write.write_type {
WriteType::Put => Ok(Some(self.reverse_load_data_by_write(write, user_key)?)),
WriteType::Delete => Ok(None),
_ => unreachable!(),
}
}
}
}
#[inline]
fn reverse_load_data_by_write(&mut self, write: Write, user_key: &Key) -> Result<Value> {
if self.cfg.omit_value {
return Ok(vec![]);
}
match write.short_value {
Some(value) => {
Ok(value)
}
None => {
self.ensure_default_cursor()?;
let value = super::near_reverse_load_data_by_write(
&mut self.default_cursor.as_mut().unwrap(),
user_key,
write.start_ts,
&mut self.statistics,
)?;
Ok(value)
}
}
}
#[inline]
fn move_write_cursor_to_prev_user_key(&mut self, current_user_key: &Key) -> Result<()> {
for i in 0..SEEK_BOUND {
if i > 0 {
self.write_cursor.prev(&mut self.statistics.write);
}
if !self.write_cursor.valid()? {
return Ok(());
}
{
let current_key = self.write_cursor.key(&mut self.statistics.write);
if !Key::is_user_key_eq(current_key, current_user_key.as_encoded().as_slice()) {
return Ok(());
}
}
}
self.write_cursor
.internal_seek_for_prev(current_user_key, &mut self.statistics.write)?;
Ok(())
}
fn ensure_default_cursor(&mut self) -> Result<()> {
if self.default_cursor.is_some() {
return Ok(());
}
self.default_cursor = Some(self.cfg.create_cf_cursor(CF_DEFAULT)?);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::super::test_util::prepare_test_data_for_check_gc_fence;
use super::super::ScannerBuilder;
use super::*;
use crate::storage::kv::{Engine, Modify, TestEngineBuilder};
use crate::storage::mvcc::tests::write;
use crate::storage::txn::tests::{
must_commit, must_gc, must_prewrite_delete, must_prewrite_put, must_rollback,
};
use crate::storage::Scanner;
use engine_traits::{CF_LOCK, CF_WRITE};
use kvproto::kvrpcpb::Context;
#[test]
fn test_basic() {
let engine = TestEngineBuilder::new().build().unwrap();
let ctx = Context::default();
let k = &[10_u8];
for ts in 0..REVERSE_SEEK_BOUND / 2 {
must_prewrite_put(&engine, k, &[ts as u8], k, ts);
must_commit(&engine, k, ts, ts);
}
let k = &[9_u8];
for ts in 0..=REVERSE_SEEK_BOUND {
must_prewrite_put(&engine, k, &[ts as u8], k, ts);
must_commit(&engine, k, ts, ts);
}
let k = &[8_u8];
for ts in 0..=REVERSE_SEEK_BOUND {
must_prewrite_put(&engine, k, &[ts as u8], k, ts);
if ts < REVERSE_SEEK_BOUND / 2 {
must_commit(&engine, k, ts, ts);
} else {
let modifies = vec![
Modify::Put(
CF_WRITE,
Key::from_raw(k).append_ts(TimeStamp::new(ts)),
vec![b'R', ts as u8],
),
Modify::Delete(CF_LOCK, Key::from_raw(k)),
];
write(&engine, &ctx, modifies);
}
}
let k = &[7_u8];
for ts in 0..REVERSE_SEEK_BOUND / 2 {
must_prewrite_put(&engine, k, &[ts as u8], k, ts);
must_commit(&engine, k, ts, ts);
}
{
let ts = REVERSE_SEEK_BOUND / 2;
must_prewrite_delete(&engine, k, k, ts);
must_commit(&engine, k, ts, ts);
}
for ts in REVERSE_SEEK_BOUND / 2 + 1..=REVERSE_SEEK_BOUND {
must_prewrite_put(&engine, k, &[ts as u8], k, ts);
let modifies = vec![
Modify::Put(
CF_WRITE,
Key::from_raw(k).append_ts(TimeStamp::new(ts)),
vec![b'R', ts as u8],
),
Modify::Delete(CF_LOCK, Key::from_raw(k)),
];
write(&engine, &ctx, modifies);
}
let k = &[6_u8];
for ts in 0..1 {
must_prewrite_put(&engine, k, &[ts as u8], k, ts);
must_commit(&engine, k, ts, ts);
}
let k = &[5_u8];
for ts in 0..=REVERSE_SEEK_BOUND {
must_prewrite_put(&engine, k, &[ts as u8], k, ts);
let modifies = vec![
Modify::Put(
CF_WRITE,
Key::from_raw(k).append_ts(TimeStamp::new(ts)),
vec![b'R', ts as u8],
),
Modify::Delete(CF_LOCK, Key::from_raw(k)),
];
write(&engine, &ctx, modifies);
}
let k = &[4_u8];
for ts in REVERSE_SEEK_BOUND..REVERSE_SEEK_BOUND + 2 {
must_prewrite_put(&engine, k, &[ts as u8], k, ts);
must_commit(&engine, k, ts, ts);
}
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut scanner = ScannerBuilder::new(snapshot, REVERSE_SEEK_BOUND.into(), true)
.range(None, Some(Key::from_raw(&[11_u8])))
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((
Key::from_raw(&[10_u8]),
vec![(REVERSE_SEEK_BOUND / 2 - 1) as u8]
))
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.prev, REVERSE_SEEK_BOUND as usize / 2);
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.seek_for_prev, 1);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[9_u8]), vec![REVERSE_SEEK_BOUND as u8]))
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.prev, REVERSE_SEEK_BOUND as usize);
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(
scanner.next().unwrap(),
Some((
Key::from_raw(&[8_u8]),
vec![(REVERSE_SEEK_BOUND / 2 - 1) as u8]
))
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.prev, REVERSE_SEEK_BOUND as usize + 1);
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.next, 1);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[6_u8]), vec![0_u8]))
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.prev, REVERSE_SEEK_BOUND as usize + 2);
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.next, 1);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[4_u8]), vec![REVERSE_SEEK_BOUND as u8]))
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.prev, REVERSE_SEEK_BOUND as usize + 3);
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.next, 1);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(scanner.next().unwrap(), None);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.prev, 0);
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
}
#[test]
fn test_reverse_get_out_of_bound_1() {
let engine = TestEngineBuilder::new().build().unwrap();
let ctx = Context::default();
for ts in 0..REVERSE_SEEK_BOUND / 2 {
let modifies = vec![
Modify::Put(
CF_WRITE,
Key::from_raw(b"b").append_ts(TimeStamp::new(ts)),
vec![b'R', ts as u8],
),
Modify::Delete(CF_LOCK, Key::from_raw(b"b")),
];
write(&engine, &ctx, modifies);
}
must_prewrite_put(&engine, b"c", b"value", b"c", REVERSE_SEEK_BOUND * 2);
must_commit(
&engine,
b"c",
REVERSE_SEEK_BOUND * 2,
REVERSE_SEEK_BOUND * 2,
);
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut scanner = ScannerBuilder::new(snapshot, (REVERSE_SEEK_BOUND * 2).into(), true)
.range(None, None)
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(b"c"), b"value".to_vec())),
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 1);
assert_eq!(scanner.next().unwrap(), None);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, (REVERSE_SEEK_BOUND / 2) as usize);
assert_eq!(scanner.next().unwrap(), None);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 0);
}
#[test]
fn test_reverse_get_out_of_bound_2() {
let engine = TestEngineBuilder::new().build().unwrap();
let ctx = Context::default();
must_prewrite_put(&engine, b"b", b"value_b", b"b", 0);
must_commit(&engine, b"b", 0, 0);
for ts in 1..=REVERSE_SEEK_BOUND / 2 {
let modifies = vec![
Modify::Put(
CF_WRITE,
Key::from_raw(b"b").append_ts(TimeStamp::new(ts)),
vec![b'R', ts as u8],
),
Modify::Delete(CF_LOCK, Key::from_raw(b"b")),
];
write(&engine, &ctx, modifies);
}
must_prewrite_put(&engine, b"c", b"value_c", b"c", REVERSE_SEEK_BOUND * 2);
must_commit(
&engine,
b"c",
REVERSE_SEEK_BOUND * 2,
REVERSE_SEEK_BOUND * 2,
);
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut scanner = ScannerBuilder::new(snapshot, (REVERSE_SEEK_BOUND * 2).into(), true)
.range(None, None)
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(b"c"), b"value_c".to_vec())),
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 1);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(b"b"), b"value_b".to_vec())),
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, (REVERSE_SEEK_BOUND / 2 + 1) as usize);
assert_eq!(scanner.next().unwrap(), None);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 0);
}
#[test]
fn test_move_prev_user_key_out_of_bound_1() {
let engine = TestEngineBuilder::new().build().unwrap();
must_prewrite_put(&engine, b"c", b"value", b"c", 1);
must_commit(&engine, b"c", 1, 1);
for ts in 1..=SEEK_BOUND / 2 {
must_prewrite_put(&engine, b"b", &[ts as u8], b"b", ts);
must_commit(&engine, b"b", ts, ts);
}
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut scanner = ScannerBuilder::new(snapshot, 1.into(), true)
.range(None, None)
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(b"c"), b"value".to_vec())),
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 1);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(b"b"), vec![1u8].to_vec())),
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, (SEEK_BOUND / 2) as usize);
assert_eq!(scanner.next().unwrap(), None);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 0);
}
#[test]
fn test_move_prev_user_key_out_of_bound_2() {
let engine = TestEngineBuilder::new().build().unwrap();
must_prewrite_put(&engine, b"c", b"value", b"c", 1);
must_commit(&engine, b"c", 1, 1);
for ts in 1..SEEK_BOUND + 2 {
must_prewrite_put(&engine, b"b", &[ts as u8], b"b", ts);
must_commit(&engine, b"b", ts, ts);
}
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut scanner = ScannerBuilder::new(snapshot, 1.into(), true)
.range(None, None)
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(b"c"), b"value".to_vec())),
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 1);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(b"b"), vec![1u8])),
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.seek_for_prev, 1);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, SEEK_BOUND as usize);
assert_eq!(scanner.next().unwrap(), None);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 0);
}
#[test]
fn test_move_prev_user_key_out_of_bound_3() {
let engine = TestEngineBuilder::new().build().unwrap();
must_prewrite_put(&engine, b"c", b"value", b"c", 1);
must_commit(&engine, b"c", 1, 1);
for ts in 1..SEEK_BOUND + REVERSE_SEEK_BOUND + 2 {
must_prewrite_put(&engine, b"b", &[ts as u8], b"b", ts);
must_commit(&engine, b"b", ts, ts);
}
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut scanner = ScannerBuilder::new(snapshot, (REVERSE_SEEK_BOUND + 1).into(), true)
.range(None, None)
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(b"c"), b"value".to_vec())),
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 1);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(b"b"), vec![(REVERSE_SEEK_BOUND + 1) as u8])),
);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 1);
assert_eq!(statistics.write.seek_for_prev, 1);
assert_eq!(statistics.write.next, 0);
assert_eq!(
statistics.write.prev,
(REVERSE_SEEK_BOUND - 1 + SEEK_BOUND - 1) as usize
);
assert_eq!(scanner.next().unwrap(), None);
let statistics = scanner.take_statistics();
assert_eq!(statistics.write.seek, 0);
assert_eq!(statistics.write.seek_for_prev, 0);
assert_eq!(statistics.write.next, 0);
assert_eq!(statistics.write.prev, 0);
}
#[test]
fn test_range() {
let engine = TestEngineBuilder::new().build().unwrap();
for i in 1..7 {
must_prewrite_put(&engine, &[i], &[], &[i], 1);
must_commit(&engine, &[i], 1, 1);
must_prewrite_put(&engine, &[i], &[i], &[i], 7);
must_commit(&engine, &[i], 7, 7);
must_prewrite_put(&engine, &[i], &[], &[i], 14);
must_commit(&engine, &[i], 14, 14);
}
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut scanner = ScannerBuilder::new(snapshot.clone(), 10.into(), true)
.range(Some(Key::from_raw(&[3u8])), Some(Key::from_raw(&[5u8])))
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[4u8]), vec![4u8]))
);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[3u8]), vec![3u8]))
);
assert_eq!(scanner.next().unwrap(), None);
let mut scanner = ScannerBuilder::new(snapshot.clone(), 10.into(), true)
.range(None, Some(Key::from_raw(&[3u8])))
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[2u8]), vec![2u8]))
);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[1u8]), vec![1u8]))
);
assert_eq!(scanner.next().unwrap(), None);
let mut scanner = ScannerBuilder::new(snapshot.clone(), 10.into(), true)
.range(Some(Key::from_raw(&[5u8])), None)
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[6u8]), vec![6u8]))
);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[5u8]), vec![5u8]))
);
assert_eq!(scanner.next().unwrap(), None);
let mut scanner = ScannerBuilder::new(snapshot, 10.into(), true)
.range(None, None)
.build()
.unwrap();
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[6u8]), vec![6u8]))
);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[5u8]), vec![5u8]))
);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[4u8]), vec![4u8]))
);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[3u8]), vec![3u8]))
);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[2u8]), vec![2u8]))
);
assert_eq!(
scanner.next().unwrap(),
Some((Key::from_raw(&[1u8]), vec![1u8]))
);
assert_eq!(scanner.next().unwrap(), None);
}
#[test]
fn test_many_tombstones() {
let engine = TestEngineBuilder::new().build().unwrap();
let start_ts = 1;
let safe_point = 2;
for i in 0..16 {
for y in 0..16 {
let pk = &[i as u8, y as u8];
must_prewrite_put(&engine, pk, b"", pk, start_ts);
must_rollback(&engine, pk, start_ts, false);
if !((i == 0 && y == 0) || (i == 15 && y == 15)) {
must_gc(&engine, pk, safe_point);
}
}
}
let start_ts = 3;
for i in 0..16 {
let pk = &[i as u8];
must_prewrite_put(&engine, pk, b"", pk, start_ts);
}
let snapshot = engine.snapshot(Default::default()).unwrap();
let row = &[15_u8];
let k = Key::from_raw(row);
let ts = 2.into();
let mut scanner = ScannerBuilder::new(snapshot, ts, true)
.range(None, Some(k))
.build()
.unwrap();
assert_eq!(scanner.next().unwrap(), None);
let statistics = scanner.take_statistics();
assert_eq!(statistics.lock.prev, 15);
assert_eq!(statistics.write.prev, 1);
}
#[test]
fn test_backward_scanner_check_gc_fence() {
let engine = TestEngineBuilder::new().build().unwrap();
let (read_ts, expected_result) = prepare_test_data_for_check_gc_fence(&engine);
let expected_result: Vec<_> = expected_result
.into_iter()
.filter_map(|(key, value)| value.map(|v| (key, v)))
.rev()
.collect();
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut scanner = ScannerBuilder::new(snapshot, read_ts, true)
.range(None, None)
.build()
.unwrap();
let result: Vec<_> = scanner
.scan(100, 0)
.unwrap()
.into_iter()
.map(|result| result.unwrap())
.collect();
assert_eq!(result, expected_result);
}
}