use kvproto::kvrpcpb::IsolationLevel;
use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE};
use std::borrow::Cow;
use txn_types::{Key, Lock, TimeStamp, TsSet, Value, WriteRef, WriteType};
use crate::storage::kv::{Cursor, CursorBuilder, ScanMode, Snapshot, Statistics};
use crate::storage::mvcc::{default_not_found_error, NewerTsCheckState, Result};
pub struct PointGetterBuilder<S: Snapshot> {
snapshot: S,
multi: bool,
fill_cache: bool,
omit_value: bool,
isolation_level: IsolationLevel,
ts: TimeStamp,
bypass_locks: TsSet,
check_has_newer_ts_data: bool,
}
impl<S: Snapshot> PointGetterBuilder<S> {
pub fn new(snapshot: S, ts: TimeStamp) -> Self {
Self {
snapshot,
multi: true,
fill_cache: true,
omit_value: false,
isolation_level: IsolationLevel::Si,
ts,
bypass_locks: Default::default(),
check_has_newer_ts_data: false,
}
}
#[inline]
pub fn multi(mut self, multi: bool) -> Self {
self.multi = multi;
self
}
#[inline]
pub fn fill_cache(mut self, fill_cache: bool) -> Self {
self.fill_cache = fill_cache;
self
}
#[inline]
pub fn omit_value(mut self, omit_value: bool) -> Self {
self.omit_value = omit_value;
self
}
#[inline]
pub fn isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
self.isolation_level = isolation_level;
self
}
#[inline]
pub fn bypass_locks(mut self, locks: TsSet) -> Self {
self.bypass_locks = locks;
self
}
#[inline]
pub fn check_has_newer_ts_data(mut self, enabled: bool) -> Self {
self.check_has_newer_ts_data = enabled;
self
}
pub fn build(self) -> Result<PointGetter<S>> {
let write_cursor = CursorBuilder::new(&self.snapshot, CF_WRITE)
.fill_cache(self.fill_cache)
.prefix_seek(true)
.scan_mode(if self.multi {
ScanMode::Mixed
} else {
ScanMode::Forward
})
.build()?;
Ok(PointGetter {
snapshot: self.snapshot,
multi: self.multi,
omit_value: self.omit_value,
isolation_level: self.isolation_level,
ts: self.ts,
bypass_locks: self.bypass_locks,
met_newer_ts_data: if self.check_has_newer_ts_data {
NewerTsCheckState::NotMetYet
} else {
NewerTsCheckState::Unknown
},
statistics: Statistics::default(),
write_cursor,
drained: false,
})
}
}
pub struct PointGetter<S: Snapshot> {
snapshot: S,
multi: bool,
omit_value: bool,
isolation_level: IsolationLevel,
ts: TimeStamp,
bypass_locks: TsSet,
met_newer_ts_data: NewerTsCheckState,
statistics: Statistics,
write_cursor: Cursor<S::Iter>,
drained: bool,
}
impl<S: Snapshot> PointGetter<S> {
#[inline]
pub fn take_statistics(&mut self) -> Statistics {
std::mem::take(&mut self.statistics)
}
#[inline]
pub fn met_newer_ts_data(&self) -> NewerTsCheckState {
self.met_newer_ts_data
}
pub fn get(&mut self, user_key: &Key) -> Result<Option<Value>> {
if !self.multi {
if self.drained {
return Ok(None);
} else {
self.drained = true;
}
}
match self.isolation_level {
IsolationLevel::Si => {
self.load_and_check_lock(user_key)?;
}
IsolationLevel::Rc => {}
}
self.load_data(user_key)
}
fn load_and_check_lock(&mut self, user_key: &Key) -> Result<()> {
self.statistics.lock.get += 1;
let lock_value = self.snapshot.get_cf(CF_LOCK, user_key)?;
if let Some(ref lock_value) = lock_value {
let lock = Lock::parse(lock_value)?;
if self.met_newer_ts_data == NewerTsCheckState::NotMetYet {
self.met_newer_ts_data = NewerTsCheckState::Met;
}
if let Err(e) =
Lock::check_ts_conflict(Cow::Owned(lock), user_key, self.ts, &self.bypass_locks)
{
self.statistics.lock.processed_keys += 1;
Err(e.into())
} else {
Ok(())
}
} else {
Ok(())
}
}
fn load_data(&mut self, user_key: &Key) -> Result<Option<Value>> {
let mut use_near_seek = false;
let mut seek_key = user_key.clone();
if self.met_newer_ts_data == NewerTsCheckState::NotMetYet {
seek_key = seek_key.append_ts(TimeStamp::max());
if !self
.write_cursor
.seek(&seek_key, &mut self.statistics.write)?
{
return Ok(None);
}
seek_key = seek_key.truncate_ts()?;
use_near_seek = true;
let cursor_key = self.write_cursor.key(&mut self.statistics.write);
if !Key::is_user_key_eq(cursor_key, user_key.as_encoded().as_slice()) {
return Ok(None);
}
if Key::decode_ts_from(cursor_key)? > self.ts {
self.met_newer_ts_data = NewerTsCheckState::Met;
}
}
seek_key = seek_key.append_ts(self.ts);
let data_found = if use_near_seek {
if self.write_cursor.key(&mut self.statistics.write) >= seek_key.as_encoded().as_slice()
{
true
} else {
self.write_cursor
.near_seek(&seek_key, &mut self.statistics.write)?
}
} else {
self.write_cursor
.seek(&seek_key, &mut self.statistics.write)?
};
if !data_found {
return Ok(None);
}
loop {
{
let cursor_key = self.write_cursor.key(&mut self.statistics.write);
if !Key::is_user_key_eq(cursor_key, user_key.as_encoded().as_slice()) {
return Ok(None);
}
}
let write = WriteRef::parse(self.write_cursor.value(&mut self.statistics.write))?;
if !write.check_gc_fence_as_latest_version(self.ts) {
return Ok(None);
}
match write.write_type {
WriteType::Put => {
self.statistics.write.processed_keys += 1;
if self.omit_value {
return Ok(Some(vec![]));
}
match write.short_value {
Some(value) => {
return Ok(Some(value.to_vec()));
}
None => {
let start_ts = write.start_ts;
return Ok(Some(self.load_data_from_default_cf(start_ts, user_key)?));
}
}
}
WriteType::Delete => {
return Ok(None);
}
WriteType::Lock | WriteType::Rollback => {
}
}
if !self.write_cursor.next(&mut self.statistics.write) {
return Ok(None);
}
}
}
fn load_data_from_default_cf(
&mut self,
write_start_ts: TimeStamp,
user_key: &Key,
) -> Result<Value> {
self.statistics.data.get += 1;
let value = self
.snapshot
.get_cf(CF_DEFAULT, &user_key.clone().append_ts(write_start_ts))?;
if let Some(value) = value {
self.statistics.data.processed_keys += 1;
Ok(value)
} else {
Err(default_not_found_error(
user_key.to_raw()?,
"load_data_from_default_cf",
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use txn_types::SHORT_VALUE_MAX_LEN;
use crate::storage::kv::{
CfStatistics, Engine, PerfStatisticsInstant, RocksEngine, TestEngineBuilder,
};
use crate::storage::txn::tests::{
must_acquire_pessimistic_lock, must_cleanup_with_gc_fence, must_commit, must_gc,
must_pessimistic_prewrite_delete, must_prewrite_delete, must_prewrite_lock,
must_prewrite_put, must_rollback,
};
fn new_multi_point_getter<E: Engine>(engine: &E, ts: TimeStamp) -> PointGetter<E::Snap> {
let snapshot = engine.snapshot(Default::default()).unwrap();
PointGetterBuilder::new(snapshot, ts)
.isolation_level(IsolationLevel::Si)
.build()
.unwrap()
}
fn new_single_point_getter<E: Engine>(engine: &E, ts: TimeStamp) -> PointGetter<E::Snap> {
let snapshot = engine.snapshot(Default::default()).unwrap();
PointGetterBuilder::new(snapshot, ts)
.isolation_level(IsolationLevel::Si)
.multi(false)
.build()
.unwrap()
}
fn must_get_key<S: Snapshot>(point_getter: &mut PointGetter<S>, key: &[u8]) {
assert!(point_getter.get(&Key::from_raw(key)).unwrap().is_some());
}
fn must_get_value<S: Snapshot>(point_getter: &mut PointGetter<S>, key: &[u8], prefix: &[u8]) {
let val = point_getter.get(&Key::from_raw(key)).unwrap().unwrap();
assert!(val.starts_with(prefix));
}
fn must_met_newer_ts_data<E: Engine>(
engine: &E,
getter_ts: impl Into<TimeStamp>,
key: &[u8],
value: &[u8],
expected_met_newer_ts_data: bool,
) {
let snapshot = engine.snapshot(Default::default()).unwrap();
let ts = getter_ts.into();
let mut point_getter = PointGetterBuilder::new(snapshot.clone(), ts)
.isolation_level(IsolationLevel::Si)
.check_has_newer_ts_data(true)
.build()
.unwrap();
let val = point_getter.get(&Key::from_raw(key)).unwrap().unwrap();
assert_eq!(val, value);
let expected = if expected_met_newer_ts_data {
NewerTsCheckState::Met
} else {
NewerTsCheckState::NotMetYet
};
assert_eq!(expected, point_getter.met_newer_ts_data());
let mut point_getter = PointGetterBuilder::new(snapshot, ts)
.isolation_level(IsolationLevel::Si)
.check_has_newer_ts_data(false)
.build()
.unwrap();
let val = point_getter.get(&Key::from_raw(key)).unwrap().unwrap();
assert_eq!(val, value);
assert_eq!(NewerTsCheckState::Unknown, point_getter.met_newer_ts_data());
}
fn must_get_none<S: Snapshot>(point_getter: &mut PointGetter<S>, key: &[u8]) {
assert!(point_getter.get(&Key::from_raw(key)).unwrap().is_none());
}
fn must_get_err<S: Snapshot>(point_getter: &mut PointGetter<S>, key: &[u8]) {
assert!(point_getter.get(&Key::from_raw(key)).is_err());
}
fn assert_seek_next_prev(stat: &CfStatistics, seek: usize, next: usize, prev: usize) {
assert_eq!(
stat.seek, seek,
"expect seek to be {}, got {}",
seek, stat.seek
);
assert_eq!(
stat.next, next,
"expect next to be {}, got {}",
next, stat.next
);
assert_eq!(
stat.prev, prev,
"expect prev to be {}, got {}",
prev, stat.prev
);
}
fn new_sample_engine() -> RocksEngine {
let suffix = "v".repeat(SHORT_VALUE_MAX_LEN + 1);
let engine = TestEngineBuilder::new().build().unwrap();
must_prewrite_put(
&engine,
b"foo1",
&format!("foo1{}", suffix).into_bytes(),
b"foo1",
2,
);
must_commit(&engine, b"foo1", 2, 3);
must_prewrite_put(
&engine,
b"foo2",
&format!("foo2{}", suffix).into_bytes(),
b"foo2",
4,
);
must_prewrite_put(
&engine,
b"bar",
&format!("bar{}", suffix).into_bytes(),
b"foo2",
4,
);
must_commit(&engine, b"foo2", 4, 5);
must_commit(&engine, b"bar", 4, 5);
must_prewrite_delete(&engine, b"xxx", b"xxx", 6);
must_commit(&engine, b"xxx", 6, 7);
must_prewrite_put(
&engine,
b"box",
&format!("box{}", suffix).into_bytes(),
b"box",
8,
);
must_prewrite_delete(&engine, b"foo1", b"box", 8);
must_commit(&engine, b"box", 8, 9);
must_commit(&engine, b"foo1", 8, 9);
must_prewrite_lock(&engine, b"bar", b"bar", 10);
must_commit(&engine, b"bar", 10, 11);
for i in 20..100 {
if i % 2 == 0 {
must_prewrite_lock(&engine, b"foo2", b"foo2", i);
must_commit(&engine, b"foo2", i, i + 1);
}
}
must_prewrite_put(
&engine,
b"zz",
&format!("zz{}", suffix).into_bytes(),
b"zz",
102,
);
must_commit(&engine, b"zz", 102, 103);
engine
}
fn new_sample_engine_2() -> RocksEngine {
let suffix = "v".repeat(SHORT_VALUE_MAX_LEN + 1);
let engine = TestEngineBuilder::new().build().unwrap();
must_prewrite_put(
&engine,
b"foo1",
&format!("foo1{}", suffix).into_bytes(),
b"foo1",
2,
);
must_prewrite_put(&engine, b"bar", b"barval", b"foo1", 2);
must_commit(&engine, b"foo1", 2, 3);
must_commit(&engine, b"bar", 2, 3);
must_prewrite_put(
&engine,
b"foo2",
&format!("foo2{}", suffix).into_bytes(),
b"foo2",
4,
);
must_prewrite_delete(&engine, b"bar", b"foo2", 4);
engine
}
#[test]
fn test_multi_basic_1() {
let engine = new_sample_engine();
let mut getter = new_multi_point_getter(&engine, 200.into());
must_get_none(&mut getter, b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_none(&mut getter, b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_value(&mut getter, b"foo2", b"foo2v");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 40, 0);
must_get_value(&mut getter, b"foo2", b"foo2v");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 40, 0);
must_get_none(&mut getter, b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_none(&mut getter, b"z");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_value(&mut getter, b"zz", b"zzv");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_value(&mut getter, b"zz", b"zzv");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
}
#[test]
fn test_multi_tombstone() {
let engine = TestEngineBuilder::new().build().unwrap();
must_prewrite_put(&engine, b"foo", b"bar", b"foo", 10);
must_prewrite_put(&engine, b"foo1", b"bar1", b"foo", 10);
must_prewrite_put(&engine, b"foo2", b"bar2", b"foo", 10);
must_prewrite_put(&engine, b"foo3", b"bar3", b"foo", 10);
must_commit(&engine, b"foo", 10, 20);
must_commit(&engine, b"foo1", 10, 20);
must_commit(&engine, b"foo2", 10, 20);
must_commit(&engine, b"foo3", 10, 20);
must_prewrite_delete(&engine, b"foo1", b"foo1", 30);
must_prewrite_delete(&engine, b"foo2", b"foo1", 30);
must_commit(&engine, b"foo1", 30, 40);
must_commit(&engine, b"foo2", 30, 40);
must_gc(&engine, b"foo", 50);
must_gc(&engine, b"foo1", 50);
must_gc(&engine, b"foo2", 50);
must_gc(&engine, b"foo3", 50);
let mut getter = new_multi_point_getter(&engine, TimeStamp::max());
let perf_statistics = PerfStatisticsInstant::new();
must_get_value(&mut getter, b"foo", b"bar");
assert_eq!(perf_statistics.delta().0.internal_delete_skipped_count, 0);
let perf_statistics = PerfStatisticsInstant::new();
must_get_none(&mut getter, b"foo1");
assert_eq!(perf_statistics.delta().0.internal_delete_skipped_count, 2);
let perf_statistics = PerfStatisticsInstant::new();
must_get_none(&mut getter, b"foo2");
assert_eq!(perf_statistics.delta().0.internal_delete_skipped_count, 2);
let perf_statistics = PerfStatisticsInstant::new();
must_get_value(&mut getter, b"foo3", b"bar3");
assert_eq!(perf_statistics.delta().0.internal_delete_skipped_count, 0);
}
#[test]
fn test_multi_with_iter_lower_bound() {
let engine = TestEngineBuilder::new().build().unwrap();
must_prewrite_put(&engine, b"foo", b"bar", b"foo", 10);
must_commit(&engine, b"foo", 10, 20);
let snapshot = engine.snapshot(Default::default()).unwrap();
let write_cursor = CursorBuilder::new(&snapshot, CF_WRITE)
.prefix_seek(true)
.scan_mode(ScanMode::Mixed)
.range(Some(Key::from_raw(b"a")), None)
.build()
.unwrap();
let mut getter = PointGetter {
snapshot,
multi: true,
omit_value: false,
isolation_level: IsolationLevel::Si,
ts: TimeStamp::new(30),
bypass_locks: Default::default(),
met_newer_ts_data: NewerTsCheckState::NotMetYet,
statistics: Statistics::default(),
write_cursor,
drained: false,
};
must_get_value(&mut getter, b"foo", b"bar");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
}
#[test]
fn test_multi_basic_2() {
let engine = new_sample_engine();
let mut getter = new_multi_point_getter(&engine, 5.into());
must_get_value(&mut getter, b"bar", b"barv");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_value(&mut getter, b"bar", b"barv");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_none(&mut getter, b"bo");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_none(&mut getter, b"box");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_value(&mut getter, b"foo1", b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_none(&mut getter, b"zz");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_value(&mut getter, b"foo1", b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_value(&mut getter, b"bar", b"barv");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
}
#[test]
fn test_multi_basic_3() {
let engine = new_sample_engine();
let mut getter = new_multi_point_getter(&engine, 2.into());
must_get_none(&mut getter, b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_none(&mut getter, b"non_exist");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 1, 0, 0);
must_get_none(&mut getter, b"foo1");
must_get_none(&mut getter, b"foo0");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 2, 0, 0);
}
#[test]
fn test_multi_locked() {
let engine = new_sample_engine_2();
let mut getter = new_multi_point_getter(&engine, 1.into());
must_get_none(&mut getter, b"a");
must_get_none(&mut getter, b"bar");
must_get_none(&mut getter, b"foo1");
must_get_none(&mut getter, b"foo2");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 4, 0, 0);
let mut getter = new_multi_point_getter(&engine, 3.into());
must_get_none(&mut getter, b"a");
must_get_value(&mut getter, b"bar", b"barv");
must_get_value(&mut getter, b"bar", b"barv");
must_get_value(&mut getter, b"foo1", b"foo1v");
must_get_value(&mut getter, b"foo1", b"foo1v");
must_get_none(&mut getter, b"foo2");
must_get_none(&mut getter, b"foo2");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 7, 0, 0);
let mut getter = new_multi_point_getter(&engine, 4.into());
must_get_none(&mut getter, b"a");
must_get_err(&mut getter, b"bar");
must_get_err(&mut getter, b"bar");
must_get_value(&mut getter, b"foo1", b"foo1v");
must_get_err(&mut getter, b"foo2");
must_get_none(&mut getter, b"zz");
let s = getter.take_statistics();
assert_seek_next_prev(&s.write, 3, 0, 0);
}
#[test]
fn test_single_basic() {
let engine = new_sample_engine_2();
let mut getter = new_single_point_getter(&engine, 1.into());
must_get_none(&mut getter, b"foo1");
let mut getter = new_single_point_getter(&engine, 3.into());
must_get_value(&mut getter, b"bar", b"barv");
must_get_none(&mut getter, b"bar");
must_get_none(&mut getter, b"foo1");
let mut getter = new_single_point_getter(&engine, 3.into());
must_get_value(&mut getter, b"foo1", b"foo1v");
must_get_none(&mut getter, b"foo2");
let mut getter = new_single_point_getter(&engine, 3.into());
must_get_none(&mut getter, b"foo2");
must_get_none(&mut getter, b"foo2");
let mut getter = new_single_point_getter(&engine, 4.into());
must_get_err(&mut getter, b"bar");
must_get_none(&mut getter, b"bar");
must_get_none(&mut getter, b"a");
must_get_none(&mut getter, b"foo1");
let mut getter = new_single_point_getter(&engine, 4.into());
must_get_value(&mut getter, b"foo1", b"foo1v");
must_get_none(&mut getter, b"foo1");
}
#[test]
fn test_omit_value() {
let engine = new_sample_engine_2();
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut getter = PointGetterBuilder::new(snapshot.clone(), 4.into())
.isolation_level(IsolationLevel::Si)
.omit_value(true)
.build()
.unwrap();
must_get_err(&mut getter, b"bar");
must_get_key(&mut getter, b"foo1");
must_get_err(&mut getter, b"foo2");
must_get_none(&mut getter, b"foo3");
fn new_omit_value_single_point_getter<S>(snapshot: S, ts: TimeStamp) -> PointGetter<S>
where
S: Snapshot,
{
PointGetterBuilder::new(snapshot, ts)
.isolation_level(IsolationLevel::Si)
.omit_value(true)
.multi(false)
.build()
.unwrap()
}
let mut getter = new_omit_value_single_point_getter(snapshot.clone(), 4.into());
must_get_err(&mut getter, b"bar");
must_get_none(&mut getter, b"bar");
let mut getter = new_omit_value_single_point_getter(snapshot.clone(), 4.into());
must_get_key(&mut getter, b"foo1");
must_get_none(&mut getter, b"foo1");
let mut getter = new_omit_value_single_point_getter(snapshot, 4.into());
must_get_none(&mut getter, b"foo3");
must_get_none(&mut getter, b"foo3");
}
#[test]
fn test_get_latest_value() {
let engine = TestEngineBuilder::new().build().unwrap();
let (key, val) = (b"foo", b"bar");
must_prewrite_put(&engine, key, val, key, 10);
must_commit(&engine, key, 10, 20);
let mut getter = new_single_point_getter(&engine, TimeStamp::max());
must_get_value(&mut getter, key, val);
must_prewrite_delete(&engine, key, key, 30);
let mut getter = new_single_point_getter(&engine, TimeStamp::max());
must_get_value(&mut getter, key, val);
must_rollback(&engine, key, 30, false);
must_prewrite_delete(&engine, key, b"bar", 40);
let mut getter = new_single_point_getter(&engine, TimeStamp::max());
must_get_err(&mut getter, key);
must_rollback(&engine, key, 40, false);
must_acquire_pessimistic_lock(&engine, key, key, 15, 50);
must_pessimistic_prewrite_delete(&engine, key, key, 15, 50, true);
let mut getter = new_single_point_getter(&engine, TimeStamp::max());
must_get_value(&mut getter, key, val);
}
#[test]
fn test_get_bypass_locks() {
let engine = TestEngineBuilder::new().build().unwrap();
let (key, val) = (b"foo", b"bar");
must_prewrite_put(&engine, key, val, key, 10);
must_commit(&engine, key, 10, 20);
must_prewrite_delete(&engine, key, key, 30);
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut getter = PointGetterBuilder::new(snapshot, 60.into())
.isolation_level(IsolationLevel::Si)
.bypass_locks(TsSet::from_u64s(vec![30, 40, 50]))
.build()
.unwrap();
must_get_value(&mut getter, key, val);
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut getter = PointGetterBuilder::new(snapshot, 60.into())
.isolation_level(IsolationLevel::Si)
.bypass_locks(TsSet::from_u64s(vec![31, 29]))
.build()
.unwrap();
must_get_err(&mut getter, key);
}
#[test]
fn test_met_newer_ts_data() {
let engine = TestEngineBuilder::new().build().unwrap();
let (key, val1) = (b"foo", b"bar1");
must_prewrite_put(&engine, key, val1, key, 10);
must_commit(&engine, key, 10, 20);
let (key, val2) = (b"foo", b"bar2");
must_prewrite_put(&engine, key, val2, key, 30);
must_commit(&engine, key, 30, 40);
must_met_newer_ts_data(&engine, 20, key, val1, true);
must_met_newer_ts_data(&engine, 30, key, val1, true);
must_met_newer_ts_data(&engine, 40, key, val2, false);
must_met_newer_ts_data(&engine, 50, key, val2, false);
must_prewrite_lock(&engine, key, key, 60);
must_met_newer_ts_data(&engine, 50, key, val2, true);
must_met_newer_ts_data(&engine, 60, key, val2, true);
}
#[test]
fn test_point_get_check_gc_fence() {
let engine = TestEngineBuilder::new().build().unwrap();
must_prewrite_put(&engine, b"k1", b"v1", b"k1", 10);
must_commit(&engine, b"k1", 10, 20);
must_cleanup_with_gc_fence(&engine, b"k1", 20, 0, 50, true);
must_prewrite_put(&engine, b"k2", b"v2", b"k2", 11);
must_commit(&engine, b"k2", 11, 20);
must_cleanup_with_gc_fence(&engine, b"k2", 20, 0, 40, true);
must_prewrite_put(&engine, b"k3", b"v3", b"k3", 12);
must_commit(&engine, b"k3", 12, 20);
must_cleanup_with_gc_fence(&engine, b"k3", 20, 0, 30, true);
must_prewrite_put(&engine, b"k4", b"v4", b"k4", 13);
must_commit(&engine, b"k4", 13, 14);
must_prewrite_put(&engine, b"k4", b"v4x", b"k4", 15);
must_commit(&engine, b"k4", 15, 20);
must_cleanup_with_gc_fence(&engine, b"k4", 14, 0, 20, false);
must_cleanup_with_gc_fence(&engine, b"k4", 20, 0, 30, true);
must_prewrite_put(&engine, b"k5", b"v5", b"k5", 13);
must_commit(&engine, b"k5", 13, 14);
must_prewrite_delete(&engine, b"k5", b"v5", 15);
must_commit(&engine, b"k5", 15, 20);
must_cleanup_with_gc_fence(&engine, b"k5", 14, 0, 20, false);
must_cleanup_with_gc_fence(&engine, b"k5", 20, 0, 30, true);
must_prewrite_put(&engine, b"k6", b"v6", b"k6", 16);
must_commit(&engine, b"k6", 16, 20);
must_prewrite_lock(&engine, b"k6", b"k6", 25);
must_commit(&engine, b"k6", 25, 26);
must_prewrite_lock(&engine, b"k6", b"k6", 28);
must_commit(&engine, b"k6", 28, 29);
must_cleanup_with_gc_fence(&engine, b"k6", 20, 0, 50, true);
must_prewrite_put(&engine, b"k7", b"v7", b"k7", 16);
must_commit(&engine, b"k7", 16, 20);
must_prewrite_lock(&engine, b"k7", b"k7", 25);
must_commit(&engine, b"k7", 25, 26);
must_cleanup_with_gc_fence(&engine, b"k7", 20, 0, 27, true);
must_prewrite_lock(&engine, b"k7", b"k7", 28);
must_commit(&engine, b"k7", 28, 29);
must_prewrite_put(&engine, b"k8", b"v8", b"k8", 17);
must_commit(&engine, b"k8", 17, 30);
must_cleanup_with_gc_fence(&engine, b"k8", 30, 0, 0, true);
must_prewrite_put(&engine, b"k9", b"v9", b"k9", 18);
must_commit(&engine, b"k9", 18, 20);
must_prewrite_lock(&engine, b"k9", b"k9", 25);
must_commit(&engine, b"k9", 25, 26);
must_cleanup_with_gc_fence(&engine, b"k9", 20, 0, 27, true);
let expected_results = vec![
(b"k1", Some(b"v1")),
(b"k2", None),
(b"k3", None),
(b"k4", None),
(b"k5", None),
(b"k6", Some(b"v6")),
(b"k7", None),
(b"k8", Some(b"v8")),
(b"k9", None),
];
for (k, v) in &expected_results {
let mut single_getter = new_single_point_getter(&engine, 40.into());
let value = single_getter.get(&Key::from_raw(*k)).unwrap();
assert_eq!(value, v.map(|v| v.to_vec()));
}
let mut multi_getter = new_multi_point_getter(&engine, 40.into());
for (k, v) in &expected_results {
let value = multi_getter.get(&Key::from_raw(*k)).unwrap();
assert_eq!(value, v.map(|v| v.to_vec()));
}
}
}