use crate::storage::mvcc::{
metrics::{MVCC_CONFLICT_COUNTER, MVCC_DUPLICATE_CMD_COUNTER_VEC},
ErrorInner, MvccTxn, Result as MvccResult, SnapshotReader,
};
use crate::storage::txn::actions::check_data_constraint::check_data_constraint;
use crate::storage::Snapshot;
use txn_types::{Key, Lock, LockType, OldValue, TimeStamp, Value, Write, WriteType};
pub fn acquire_pessimistic_lock<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<S>,
key: Key,
primary: &[u8],
should_not_exist: bool,
lock_ttl: u64,
for_update_ts: TimeStamp,
need_value: bool,
min_commit_ts: TimeStamp,
need_old_value: bool,
) -> MvccResult<(Option<Value>, OldValue)> {
fail_point!("acquire_pessimistic_lock", |err| Err(
crate::storage::mvcc::txn::make_txn_error(err, &key, reader.start_ts).into()
));
fn pessimistic_lock(
primary: &[u8],
start_ts: TimeStamp,
lock_ttl: u64,
for_update_ts: TimeStamp,
min_commit_ts: TimeStamp,
) -> Lock {
Lock::new(
LockType::Pessimistic,
primary.to_vec(),
start_ts,
lock_ttl,
None,
for_update_ts,
0,
min_commit_ts,
)
}
fn load_old_value<S: Snapshot>(
need_old_value: bool,
need_value: bool,
val: Option<&Value>,
reader: &mut SnapshotReader<S>,
key: &Key,
for_update_ts: TimeStamp,
prev_write_loaded: bool,
prev_write: Option<Write>,
) -> MvccResult<OldValue> {
if !need_old_value {
return Ok(OldValue::Unspecified);
}
if need_value {
Ok(match val {
Some(val) => OldValue::Value { value: val.clone() },
None => OldValue::None,
})
} else {
reader.get_old_value(&key, for_update_ts, prev_write_loaded, prev_write)
}
}
let mut val = None;
if let Some(lock) = reader.load_lock(&key)? {
if lock.ts != reader.start_ts {
return Err(ErrorInner::KeyIsLocked(lock.into_lock_info(key.into_raw()?)).into());
}
if lock.lock_type != LockType::Pessimistic {
return Err(ErrorInner::LockTypeNotMatch {
start_ts: reader.start_ts,
key: key.into_raw()?,
pessimistic: false,
}
.into());
}
if need_value {
val = reader.get(&key, for_update_ts)?;
}
let (prev_write_loaded, prev_write) = (false, None);
let old_value = load_old_value(
need_old_value,
need_value,
val.as_ref(),
reader,
&key,
for_update_ts,
prev_write_loaded,
prev_write,
)?;
if for_update_ts > lock.for_update_ts {
let lock = pessimistic_lock(
primary,
reader.start_ts,
lock_ttl,
for_update_ts,
min_commit_ts,
);
txn.put_lock(key, &lock);
} else {
MVCC_DUPLICATE_CMD_COUNTER_VEC
.acquire_pessimistic_lock
.inc();
}
return Ok((val, old_value));
}
let (prev_write_loaded, mut prev_write) = (true, None);
if let Some((commit_ts, write)) = reader.seek_write(&key, TimeStamp::max())? {
if need_old_value {
prev_write = Some(write.clone());
}
if commit_ts > for_update_ts {
MVCC_CONFLICT_COUNTER
.acquire_pessimistic_lock_conflict
.inc();
return Err(ErrorInner::WriteConflict {
start_ts: reader.start_ts,
conflict_start_ts: write.start_ts,
conflict_commit_ts: commit_ts,
key: key.into_raw()?,
primary: primary.to_vec(),
}
.into());
}
if commit_ts == reader.start_ts
&& (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
{
assert!(write.has_overlapped_rollback || write.start_ts == commit_ts);
return Err(ErrorInner::PessimisticLockRolledBack {
start_ts: reader.start_ts,
key: key.into_raw()?,
}
.into());
}
if commit_ts > reader.start_ts {
if let Some((older_commit_ts, older_write)) =
reader.seek_write(&key, reader.start_ts)?
{
if older_commit_ts == reader.start_ts
&& (older_write.write_type == WriteType::Rollback
|| older_write.has_overlapped_rollback)
{
return Err(ErrorInner::PessimisticLockRolledBack {
start_ts: reader.start_ts,
key: key.into_raw()?,
}
.into());
}
}
}
check_data_constraint(reader, should_not_exist, &write, commit_ts, &key)?;
if need_value {
val = match write.write_type {
WriteType::Put
if write
.as_ref()
.check_gc_fence_as_latest_version(reader.start_ts) =>
{
Some(reader.load_data(&key, write)?)
}
WriteType::Delete | WriteType::Put => None,
WriteType::Lock | WriteType::Rollback => reader.get(&key, commit_ts.prev())?,
};
}
}
let old_value = load_old_value(
need_old_value,
need_value,
val.as_ref(),
reader,
&key,
for_update_ts,
prev_write_loaded,
prev_write,
)?;
let lock = pessimistic_lock(
primary,
reader.start_ts,
lock_ttl,
for_update_ts,
min_commit_ts,
);
txn.put_lock(key, &lock);
Ok((val, old_value))
}
pub mod tests {
use super::*;
use crate::storage::kv::WriteData;
use crate::storage::mvcc::{Error as MvccError, MvccReader};
use crate::storage::Engine;
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
use txn_types::TimeStamp;
#[cfg(test)]
use crate::storage::{
mvcc::tests::*,
txn::actions::prewrite::tests::{
old_value_put_delete_lock_insert, old_value_random, OldValueRandomTest,
},
txn::commands::pessimistic_rollback,
txn::tests::*,
TestEngineBuilder,
};
pub fn must_succeed_impl<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: impl Into<TimeStamp>,
should_not_exist: bool,
lock_ttl: u64,
for_update_ts: impl Into<TimeStamp>,
need_value: bool,
min_commit_ts: impl Into<TimeStamp>,
) -> Option<Value> {
let ctx = Context::default();
let snapshot = engine.snapshot(Default::default()).unwrap();
let min_commit_ts = min_commit_ts.into();
let cm = ConcurrencyManager::new(min_commit_ts);
let start_ts = start_ts.into();
let mut txn = MvccTxn::new(start_ts, cm);
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
let res = acquire_pessimistic_lock(
&mut txn,
&mut reader,
Key::from_raw(key),
pk,
should_not_exist,
lock_ttl,
for_update_ts.into(),
need_value,
min_commit_ts,
false,
)
.unwrap();
let modifies = txn.into_modifies();
if !modifies.is_empty() {
engine
.write(&ctx, WriteData::from_modifies(modifies))
.unwrap();
}
res.0
}
pub fn must_succeed<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
) {
must_succeed_with_ttl(engine, key, pk, start_ts, for_update_ts, 0);
}
pub fn must_succeed_return_value<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
) -> Option<Value> {
must_succeed_impl(
engine,
key,
pk,
start_ts,
false,
0,
for_update_ts.into(),
true,
TimeStamp::zero(),
)
}
pub fn must_succeed_with_ttl<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
ttl: u64,
) {
assert!(
must_succeed_impl(
engine,
key,
pk,
start_ts,
false,
ttl,
for_update_ts.into(),
false,
TimeStamp::zero(),
)
.is_none()
);
}
pub fn must_succeed_for_large_txn<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
lock_ttl: u64,
) {
let for_update_ts = for_update_ts.into();
let min_commit_ts = for_update_ts.next();
must_succeed_impl(
engine,
key,
pk,
start_ts,
false,
lock_ttl,
for_update_ts,
false,
min_commit_ts,
);
}
pub fn must_err<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
) -> MvccError {
must_err_impl(
engine,
key,
pk,
start_ts,
false,
for_update_ts,
false,
TimeStamp::zero(),
)
}
pub fn must_err_return_value<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
) -> MvccError {
must_err_impl(
engine,
key,
pk,
start_ts,
false,
for_update_ts,
true,
TimeStamp::zero(),
)
}
fn must_err_impl<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: impl Into<TimeStamp>,
should_not_exist: bool,
for_update_ts: impl Into<TimeStamp>,
need_value: bool,
min_commit_ts: impl Into<TimeStamp>,
) -> MvccError {
let snapshot = engine.snapshot(Default::default()).unwrap();
let min_commit_ts = min_commit_ts.into();
let cm = ConcurrencyManager::new(min_commit_ts);
let start_ts = start_ts.into();
let mut txn = MvccTxn::new(start_ts, cm);
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
acquire_pessimistic_lock(
&mut txn,
&mut reader,
Key::from_raw(key),
pk,
should_not_exist,
0,
for_update_ts.into(),
need_value,
min_commit_ts,
false,
)
.unwrap_err()
}
pub fn must_pessimistic_locked<E: Engine>(
engine: &E,
key: &[u8],
start_ts: impl Into<TimeStamp>,
for_update_ts: impl Into<TimeStamp>,
) {
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut reader = MvccReader::new(snapshot, None, true);
let lock = reader.load_lock(&Key::from_raw(key)).unwrap().unwrap();
assert_eq!(lock.ts, start_ts.into());
assert_eq!(lock.for_update_ts, for_update_ts.into());
assert_eq!(lock.lock_type, LockType::Pessimistic);
}
#[test]
fn test_pessimistic_lock() {
let engine = TestEngineBuilder::new().build().unwrap();
let k = b"k1";
let v = b"v1";
must_succeed(&engine, k, k, 1, 1);
must_pessimistic_locked(&engine, k, 1, 1);
must_pessimistic_prewrite_put(&engine, k, v, k, 1, 1, true);
must_locked(&engine, k, 1);
must_commit(&engine, k, 1, 2);
must_unlocked(&engine, k);
must_prewrite_put(&engine, k, v, k, 3);
must_err(&engine, k, k, 4, 4);
must_cleanup(&engine, k, 3, 0);
must_unlocked(&engine, k);
must_succeed(&engine, k, k, 5, 5);
must_prewrite_lock_err(&engine, k, k, 6);
must_err(&engine, k, k, 6, 6);
must_cleanup(&engine, k, 5, 0);
must_unlocked(&engine, k);
must_prewrite_put(&engine, k, v, k, 7);
must_commit(&engine, k, 7, 9);
must_unlocked(&engine, k);
must_prewrite_lock_err(&engine, k, k, 8);
must_err(&engine, k, k, 8, 8);
must_succeed(&engine, k, k, 8, 9);
must_pessimistic_prewrite_put(&engine, k, v, k, 8, 8, true);
must_commit(&engine, k, 8, 10);
must_unlocked(&engine, k);
must_succeed(&engine, k, k, 11, 11);
must_pessimistic_locked(&engine, k, 11, 11);
must_cleanup(&engine, k, 11, 0);
must_err(&engine, k, k, 11, 11);
must_pessimistic_prewrite_put_err(&engine, k, v, k, 11, 11, true);
must_prewrite_lock_err(&engine, k, k, 11);
must_unlocked(&engine, k);
must_succeed(&engine, k, k, 12, 12);
must_pessimistic_prewrite_put(&engine, k, v, k, 12, 12, true);
must_locked(&engine, k, 12);
must_cleanup(&engine, k, 12, 0);
must_err(&engine, k, k, 12, 12);
must_pessimistic_prewrite_put_err(&engine, k, v, k, 12, 12, true);
must_prewrite_lock_err(&engine, k, k, 12);
must_unlocked(&engine, k);
must_succeed(&engine, k, k, 13, 13);
must_pessimistic_locked(&engine, k, 13, 13);
must_succeed(&engine, k, k, 13, 13);
must_pessimistic_locked(&engine, k, 13, 13);
must_pessimistic_prewrite_put(&engine, k, v, k, 13, 13, true);
must_locked(&engine, k, 13);
must_pessimistic_prewrite_put(&engine, k, v, k, 13, 13, true);
must_locked(&engine, k, 13);
must_commit(&engine, k, 13, 14);
must_unlocked(&engine, k);
must_commit(&engine, k, 13, 14);
must_unlocked(&engine, k);
must_succeed(&engine, k, k, 15, 15);
must_pessimistic_locked(&engine, k, 15, 15);
must_get(&engine, k, 16, v);
must_pessimistic_prewrite_delete(&engine, k, k, 15, 15, true);
must_get_err(&engine, k, 16);
must_commit(&engine, k, 15, 17);
must_succeed(&engine, k, k, 18, 18);
must_rollback(&engine, k, 18, false);
must_unlocked(&engine, k);
must_prewrite_put(&engine, k, v, k, 19);
must_commit(&engine, k, 19, 20);
must_err(&engine, k, k, 18, 21);
must_unlocked(&engine, k);
must_prewrite_put(&engine, k, v, k, 23);
must_locked(&engine, k, 23);
must_err(&engine, k, k, 23, 23);
must_cleanup(&engine, k, 23, 0);
must_succeed(&engine, k, k, 24, 24);
must_pessimistic_locked(&engine, k, 24, 24);
must_prewrite_put_err(&engine, k, v, k, 24);
must_rollback(&engine, k, 24, false);
must_succeed(&engine, k, k, 26, 26);
must_pessimistic_locked(&engine, k, 26, 26);
must_pessimistic_prewrite_delete(&engine, k, k, 26, 26, true);
must_locked(&engine, k, 26);
must_err(&engine, k, k, 26, 26);
must_locked(&engine, k, 26);
must_commit(&engine, k, 26, 27);
must_unlocked(&engine, k);
must_get_none(&engine, k, 28);
must_err(&engine, k, k, 26, 26);
must_unlocked(&engine, k);
must_get_none(&engine, k, 28);
must_pessimistic_prewrite_put_err(&engine, k, v, k, 26, 26, true);
must_unlocked(&engine, k);
must_get_none(&engine, k, 28);
must_succeed(&engine, k, k, 26, 29);
pessimistic_rollback::tests::must_success(&engine, k, 26, 29);
must_unlocked(&engine, k);
must_pessimistic_prewrite_put(&engine, k, v, k, 30, 30, false);
must_locked(&engine, k, 30);
must_commit(&engine, k, 30, 31);
must_unlocked(&engine, k);
must_get_commit_ts(&engine, k, 30, 31);
must_rollback(&engine, k, 32, false);
must_rollback(&engine, k, 33, false);
must_err(&engine, k, k, 32, 32);
must_succeed(&engine, k, k, 32, 34);
pessimistic_rollback::tests::must_success(&engine, k, 32, 34);
must_unlocked(&engine, k);
must_succeed(&engine, k, k, 35, 36);
must_pessimistic_locked(&engine, k, 35, 36);
must_succeed(&engine, k, k, 35, 35);
must_pessimistic_locked(&engine, k, 35, 36);
must_succeed(&engine, k, k, 35, 37);
must_pessimistic_locked(&engine, k, 35, 37);
must_pessimistic_prewrite_put_err(&engine, k, v, k, 36, 36, true);
must_pessimistic_prewrite_put_err(&engine, k, v, k, 36, 38, true);
must_pessimistic_locked(&engine, k, 35, 37);
must_pessimistic_prewrite_put(&engine, k, v, k, 35, 37, true);
must_locked(&engine, k, 35);
must_pessimistic_prewrite_put_err(&engine, k, v, k, 36, 38, true);
must_locked(&engine, k, 35);
must_commit(&engine, k, 35, 36);
must_unlocked(&engine, k);
must_get_commit_ts(&engine, k, 35, 36);
must_succeed(&engine, k, k, 40, 40);
must_pessimistic_locked(&engine, k, 40, 40);
must_pessimistic_prewrite_put(&engine, k, v, k, 40, 40, false);
must_locked(&engine, k, 40);
must_commit(&engine, k, 40, 41);
must_unlocked(&engine, k);
must_succeed(&engine, k, k, 42, 45);
must_pessimistic_locked(&engine, k, 42, 45);
must_pessimistic_prewrite_put(&engine, k, v, k, 42, 43, true);
must_locked(&engine, k, 42);
must_commit(&engine, k, 42, 45);
must_unlocked(&engine, k);
must_succeed(&engine, k, k, 46, 47);
must_pessimistic_locked(&engine, k, 46, 47);
must_pessimistic_prewrite_put(&engine, k, v, k, 46, 48, true);
must_locked(&engine, k, 46);
must_commit(&engine, k, 46, 50);
must_unlocked(&engine, k);
must_pessimistic_prewrite_put(&engine, k, v, k, 47, 48, false);
must_locked(&engine, k, 47);
must_cleanup(&engine, k, 47, 0);
must_unlocked(&engine, k);
must_succeed(&engine, k, k, 49, 60);
must_pessimistic_prewrite_put(&engine, k, v, k, 49, 60, true);
must_locked(&engine, k, 49);
must_cleanup(&engine, k, 49, 0);
must_get_rollback_protected(&engine, k, 49, true);
must_prewrite_put(&engine, k, v, k, 51);
must_rollback(&engine, k, 51, false);
must_err(&engine, k, k, 49, 60);
must_succeed(&engine, k, k, 50, 61);
must_pessimistic_prewrite_put(&engine, k, v, k, 50, 61, true);
must_locked(&engine, k, 50);
must_cleanup(&engine, k, 50, 0);
must_get_overlapped_rollback(&engine, k, 50, 46, WriteType::Put, Some(0));
for start_ts in &[140, 150, 160] {
let for_update_ts = start_ts + 48;
let commit_ts = start_ts + 50;
must_succeed(&engine, k, k, *start_ts, for_update_ts);
must_pessimistic_prewrite_put(&engine, k, v, k, *start_ts, for_update_ts, true);
must_commit(&engine, k, *start_ts, commit_ts);
must_get(&engine, k, commit_ts + 1, v);
}
must_rollback(&engine, k, 170, false);
must_get_commit_ts(&engine, k, 140, 190);
must_get_commit_ts(&engine, k, 150, 200);
must_get_commit_ts(&engine, k, 160, 210);
must_get_rollback_ts(&engine, k, 170);
}
#[test]
fn test_pessimistic_lock_return_value() {
let engine = TestEngineBuilder::new().build().unwrap();
let (k, v) = (b"k", b"v");
assert_eq!(must_succeed_return_value(&engine, k, k, 10, 10), None);
must_pessimistic_locked(&engine, k, 10, 10);
pessimistic_rollback::tests::must_success(&engine, k, 10, 10);
must_prewrite_put(&engine, k, v, k, 10);
match must_err_return_value(&engine, k, k, 20, 20) {
MvccError(box ErrorInner::KeyIsLocked(_)) => (),
e => panic!("unexpected error: {}", e),
};
must_commit(&engine, k, 10, 20);
match must_err_return_value(&engine, k, k, 15, 15) {
MvccError(box ErrorInner::WriteConflict { .. }) => (),
e => panic!("unexpected error: {}", e),
};
assert_eq!(
must_succeed_return_value(&engine, k, k, 25, 25),
Some(v.to_vec())
);
must_pessimistic_locked(&engine, k, 25, 25);
pessimistic_rollback::tests::must_success(&engine, k, 25, 25);
must_prewrite_lock(&engine, k, k, 30);
must_commit(&engine, k, 30, 40);
assert_eq!(
must_succeed_return_value(&engine, k, k, 45, 45),
Some(v.to_vec())
);
must_pessimistic_locked(&engine, k, 45, 45);
pessimistic_rollback::tests::must_success(&engine, k, 45, 45);
must_rollback(&engine, k, 50, false);
assert_eq!(
must_succeed_return_value(&engine, k, k, 55, 55),
Some(v.to_vec())
);
must_pessimistic_locked(&engine, k, 55, 55);
pessimistic_rollback::tests::must_success(&engine, k, 55, 55);
must_prewrite_delete(&engine, k, k, 60);
must_commit(&engine, k, 60, 70);
assert_eq!(must_succeed_return_value(&engine, k, k, 75, 75), None);
assert_eq!(must_succeed_return_value(&engine, k, k, 75, 75), None);
assert_eq!(
must_succeed_return_value(&engine, k, k, 75, 55),
Some(v.to_vec())
);
must_pessimistic_locked(&engine, k, 75, 75);
pessimistic_rollback::tests::must_success(&engine, k, 75, 75);
}
#[test]
fn test_overwrite_pessimistic_lock() {
let engine = TestEngineBuilder::new().build().unwrap();
let k = b"k1";
must_succeed(&engine, k, k, 1, 2);
must_pessimistic_locked(&engine, k, 1, 2);
must_succeed(&engine, k, k, 1, 1);
must_pessimistic_locked(&engine, k, 1, 2);
must_succeed(&engine, k, k, 1, 3);
must_pessimistic_locked(&engine, k, 1, 3);
}
#[test]
fn test_pessimistic_lock_check_gc_fence() {
use pessimistic_rollback::tests::must_success as must_pessimistic_rollback;
let engine = TestEngineBuilder::new().build().unwrap();
must_prewrite_put(&engine, b"k1", b"v1", b"k1", 10);
must_commit(&engine, b"k1", 10, 30);
must_cleanup_with_gc_fence(&engine, b"k1", 30, 0, 40, true);
must_prewrite_put(&engine, b"k2", b"v2", b"k2", 11);
must_commit(&engine, b"k2", 11, 30);
must_cleanup_with_gc_fence(&engine, b"k2", 30, 0, 0, true);
must_prewrite_put(&engine, b"k3", b"v3", b"k3", 12);
must_commit(&engine, b"k3", 12, 30);
must_prewrite_lock(&engine, b"k3", b"k3", 37);
must_commit(&engine, b"k3", 37, 38);
must_cleanup_with_gc_fence(&engine, b"k3", 30, 0, 40, true);
must_prewrite_lock(&engine, b"k3", b"k3", 42);
must_commit(&engine, b"k3", 42, 43);
must_prewrite_put(&engine, b"k4", b"v4", b"k4", 13);
must_commit(&engine, b"k4", 13, 30);
must_prewrite_lock(&engine, b"k4", b"k4", 37);
must_commit(&engine, b"k4", 37, 38);
must_prewrite_lock(&engine, b"k4", b"k4", 42);
must_commit(&engine, b"k4", 42, 43);
must_cleanup_with_gc_fence(&engine, b"k4", 30, 0, 0, true);
must_prewrite_put(&engine, b"k5", b"v5", b"k5", 14);
must_commit(&engine, b"k5", 14, 20);
must_prewrite_put(&engine, b"k5", b"v5x", b"k5", 21);
must_commit(&engine, b"k5", 21, 30);
must_cleanup_with_gc_fence(&engine, b"k5", 20, 0, 30, false);
must_cleanup_with_gc_fence(&engine, b"k5", 30, 0, 40, true);
must_prewrite_put(&engine, b"k6", b"v6", b"k6", 15);
must_commit(&engine, b"k6", 15, 20);
must_prewrite_put(&engine, b"k6", b"v6x", b"k6", 22);
must_commit(&engine, b"k6", 22, 30);
must_cleanup_with_gc_fence(&engine, b"k6", 20, 0, 30, false);
must_cleanup_with_gc_fence(&engine, b"k6", 30, 0, 0, true);
must_prewrite_put(&engine, b"k7", b"v7", b"k7", 16);
must_commit(&engine, b"k7", 16, 30);
must_prewrite_lock(&engine, b"k7", b"k7", 37);
must_commit(&engine, b"k7", 37, 38);
must_cleanup_with_gc_fence(&engine, b"k7", 30, 0, 40, true);
let cases = vec![
(b"k1" as &[u8], None),
(b"k2", Some(b"v2" as &[u8])),
(b"k3", None),
(b"k4", Some(b"v4")),
(b"k5", None),
(b"k6", Some(b"v6x")),
(b"k7", None),
];
for (key, expected_value) in cases {
if expected_value.is_none() {
assert!(must_succeed_impl(&engine, key, key, 50, true, 0, 50, false, 51).is_none());
must_pessimistic_rollback(&engine, key, 50, 51);
} else {
must_err_impl(&engine, key, key, 50, true, 50, false, 51);
}
must_unlocked(&engine, key);
let res = must_succeed_impl(&engine, key, key, 50, false, 0, 50, true, 51);
assert_eq!(res, expected_value.map(|v| v.to_vec()));
must_pessimistic_rollback(&engine, key, 50, 51);
must_succeed(&engine, key, key, 50, 51);
let res2 = must_succeed_impl(&engine, key, key, 50, false, 0, 50, true, 51);
assert_eq!(res2, expected_value.map(|v| v.to_vec()));
must_pessimistic_rollback(&engine, key, 50, 51);
}
}
#[test]
fn test_old_value_put_delete_lock_insert() {
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let start_ts = old_value_put_delete_lock_insert(&engine, b"k1");
let key = Key::from_raw(b"k1");
for should_not_exist in &[true, false] {
for need_value in &[true, false] {
let snapshot = engine.snapshot(Default::default()).unwrap();
let cm = ConcurrencyManager::new(start_ts);
let mut txn = MvccTxn::new(start_ts, cm);
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
let need_old_value = true;
let lock_ttl = 0;
let for_update_ts = start_ts;
let min_commit_ts = 0.into();
let (_, old_value) = acquire_pessimistic_lock(
&mut txn,
&mut reader,
key.clone(),
key.as_encoded(),
*should_not_exist,
lock_ttl,
for_update_ts,
*need_value,
min_commit_ts,
need_old_value,
)
.unwrap();
assert_eq!(old_value, OldValue::None);
}
}
}
#[test]
fn test_old_value_for_update_ts() {
let engine = TestEngineBuilder::new().build().unwrap();
let k = b"k1";
let v1 = b"v1";
must_succeed(&engine, k, k, 1, 1);
must_pessimistic_prewrite_put(&engine, k, v1, k, 1, 1, true);
must_commit(&engine, k, 1, 2);
let v2 = b"v2";
must_succeed(&engine, k, k, 10, 10);
must_pessimistic_prewrite_put(&engine, k, v2, k, 10, 10, true);
must_commit(&engine, k, 10, 11);
let snapshot = engine.snapshot(Default::default()).unwrap();
let min_commit_ts = TimeStamp::zero();
let cm = ConcurrencyManager::new(min_commit_ts);
let start_ts = TimeStamp::new(9);
let for_update_ts = TimeStamp::new(12);
let need_old_value = true;
let need_value = false;
let mut txn = MvccTxn::new(start_ts, cm.clone());
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
let res = acquire_pessimistic_lock(
&mut txn,
&mut reader,
Key::from_raw(k),
k,
false,
0,
for_update_ts,
need_value,
min_commit_ts,
need_old_value,
)
.unwrap();
assert_eq!(
res.1,
OldValue::Value {
value: b"v2".to_vec()
}
);
let modifies = txn.into_modifies();
if !modifies.is_empty() {
engine
.write(&Default::default(), WriteData::from_modifies(modifies))
.unwrap();
}
let mut txn = MvccTxn::new(start_ts, cm);
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
let res = acquire_pessimistic_lock(
&mut txn,
&mut reader,
Key::from_raw(k),
k,
false,
0,
for_update_ts,
false,
min_commit_ts,
true,
)
.unwrap();
assert_eq!(
res.1,
OldValue::Value {
value: b"v2".to_vec()
}
);
}
#[test]
fn test_old_value_random() {
let key = b"k1";
let mut tests: Vec<OldValueRandomTest> = vec![];
let mut tests_require_old_value_none: Vec<OldValueRandomTest> = vec![];
for should_not_exist in &[true, false] {
for need_value in &[true, false] {
let should_not_exist = *should_not_exist;
let need_value = *need_value;
let t = Box::new(move |snapshot, start_ts| {
let key = Key::from_raw(key);
let cm = ConcurrencyManager::new(start_ts);
let mut txn = MvccTxn::new(start_ts, cm);
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
let need_old_value = true;
let lock_ttl = 0;
let for_update_ts = start_ts;
let min_commit_ts = 0.into();
let (_, old_value) = acquire_pessimistic_lock(
&mut txn,
&mut reader,
key.clone(),
key.as_encoded(),
should_not_exist,
lock_ttl,
for_update_ts,
need_value,
min_commit_ts,
need_old_value,
)?;
Ok(old_value)
});
if should_not_exist {
tests_require_old_value_none.push(t);
} else {
tests.push(t);
}
}
}
let require_old_value_none = false;
old_value_random(key, require_old_value_none, tests);
let require_old_value_none = true;
old_value_random(key, require_old_value_none, tests_require_old_value_none);
}
}