use crate::storage::{
mvcc::{
metrics::{
CONCURRENCY_MANAGER_LOCK_DURATION_HISTOGRAM, MVCC_CONFLICT_COUNTER,
MVCC_DUPLICATE_CMD_COUNTER_VEC,
},
Error, ErrorInner, Lock, LockType, MvccTxn, Result, SnapshotReader,
},
txn::actions::check_data_constraint::check_data_constraint,
txn::LockInfo,
Snapshot,
};
use fail::fail_point;
use std::cmp;
use txn_types::{
is_short_value, Key, Mutation, MutationType, OldValue, TimeStamp, Value, Write, WriteType,
};
pub fn prewrite<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<S>,
txn_props: &TransactionProperties,
mutation: Mutation,
secondary_keys: &Option<Vec<Vec<u8>>>,
is_pessimistic_lock: bool,
) -> Result<(TimeStamp, OldValue)> {
let mut mutation = PrewriteMutation::from_mutation(mutation, secondary_keys, txn_props)?;
fail_point!(
if txn_props.is_pessimistic() {
"pessimistic_prewrite"
} else {
"prewrite"
},
|err| Err(crate::storage::mvcc::txn::make_txn_error(
err,
&mutation.key,
mutation.txn_props.start_ts
)
.into())
);
let lock_status = match reader.load_lock(&mutation.key)? {
Some(lock) => mutation.check_lock(lock, is_pessimistic_lock)?,
None if is_pessimistic_lock => {
amend_pessimistic_lock(&mutation.key, reader)?;
LockStatus::None
}
None => LockStatus::None,
};
if let LockStatus::Locked(ts) = lock_status {
return Ok((ts, OldValue::Unspecified));
}
let prev_write = if !mutation.skip_constraint_check() {
mutation.check_for_newer_version(reader)?
} else {
None
};
if mutation.should_not_write {
txn.concurrency_manager.update_max_ts(txn_props.start_ts);
let min_commit_ts = if mutation.need_min_commit_ts() {
cmp::max(txn_props.min_commit_ts, txn_props.start_ts.next())
} else {
TimeStamp::zero()
};
return Ok((min_commit_ts, OldValue::Unspecified));
}
let old_value = if txn_props.need_old_value
&& matches!(
mutation.mutation_type,
MutationType::Put | MutationType::Delete | MutationType::Insert
) {
if mutation.mutation_type == MutationType::Insert {
OldValue::None
} else if mutation.skip_constraint_check() {
if mutation.txn_props.is_pessimistic() {
OldValue::Unspecified
} else {
OldValue::None
}
} else {
let prev_write_loaded = true;
let ts = match txn_props.kind {
TransactionKind::Optimistic(_) => txn_props.start_ts,
TransactionKind::Pessimistic(for_update_ts) => for_update_ts,
};
reader.get_old_value(&mutation.key, ts, prev_write_loaded, prev_write)?
}
} else {
OldValue::Unspecified
};
let final_min_commit_ts = mutation.write_lock(lock_status, txn)?;
fail_point!("after_prewrite_one_key");
Ok((final_min_commit_ts, old_value))
}
#[derive(Clone, Debug)]
pub struct TransactionProperties<'a> {
pub start_ts: TimeStamp,
pub kind: TransactionKind,
pub commit_kind: CommitKind,
pub primary: &'a [u8],
pub txn_size: u64,
pub lock_ttl: u64,
pub min_commit_ts: TimeStamp,
pub need_old_value: bool,
}
impl<'a> TransactionProperties<'a> {
fn max_commit_ts(&self) -> TimeStamp {
match &self.commit_kind {
CommitKind::TwoPc => unreachable!(),
CommitKind::OnePc(ts) => *ts,
CommitKind::Async(ts) => *ts,
}
}
fn is_pessimistic(&self) -> bool {
match &self.kind {
TransactionKind::Optimistic(_) => false,
TransactionKind::Pessimistic(_) => true,
}
}
fn for_update_ts(&self) -> TimeStamp {
match &self.kind {
TransactionKind::Optimistic(_) => TimeStamp::zero(),
TransactionKind::Pessimistic(ts) => *ts,
}
}
}
#[derive(Clone, Debug)]
pub enum CommitKind {
TwoPc,
OnePc(TimeStamp),
Async(TimeStamp),
}
#[derive(Clone, Debug)]
pub enum TransactionKind {
Optimistic(bool),
Pessimistic(TimeStamp),
}
enum LockStatus {
Locked(TimeStamp),
Pessimistic,
None,
}
impl LockStatus {
fn has_pessimistic_lock(&self) -> bool {
matches!(self, LockStatus::Pessimistic)
}
}
struct PrewriteMutation<'a> {
key: Key,
value: Option<Value>,
mutation_type: MutationType,
secondary_keys: &'a Option<Vec<Vec<u8>>>,
min_commit_ts: TimeStamp,
lock_type: Option<LockType>,
lock_ttl: u64,
should_not_exist: bool,
should_not_write: bool,
txn_props: &'a TransactionProperties<'a>,
}
impl<'a> PrewriteMutation<'a> {
fn from_mutation(
mutation: Mutation,
secondary_keys: &'a Option<Vec<Vec<u8>>>,
txn_props: &'a TransactionProperties<'a>,
) -> Result<PrewriteMutation<'a>> {
let should_not_write = mutation.should_not_write();
if txn_props.is_pessimistic() && should_not_write {
return Err(box_err!(
"cannot handle checkNotExists in pessimistic prewrite"
));
}
let should_not_exist = mutation.should_not_exists();
let mutation_type = mutation.mutation_type();
let lock_type = LockType::from_mutation(&mutation);
let (key, value) = mutation.into_key_value();
Ok(PrewriteMutation {
key,
value,
mutation_type,
secondary_keys,
min_commit_ts: txn_props.min_commit_ts,
lock_type,
lock_ttl: txn_props.lock_ttl,
should_not_exist,
should_not_write,
txn_props,
})
}
fn lock_info(&self, lock: Lock) -> Result<LockInfo> {
let mut info = lock.into_lock_info(self.key.to_raw()?);
if self.txn_props.is_pessimistic() {
info.set_lock_ttl(0);
}
Ok(info)
}
fn check_lock(&mut self, lock: Lock, is_pessimistic_lock: bool) -> Result<LockStatus> {
if lock.ts != self.txn_props.start_ts {
if is_pessimistic_lock {
warn!(
"prewrite failed (pessimistic lock not found)";
"start_ts" => self.txn_props.start_ts,
"key" => %self.key,
"lock_ts" => lock.ts
);
return Err(ErrorInner::PessimisticLockNotFound {
start_ts: self.txn_props.start_ts,
key: self.key.to_raw()?,
}
.into());
}
return Err(ErrorInner::KeyIsLocked(self.lock_info(lock)?).into());
}
if lock.lock_type == LockType::Pessimistic {
if !self.txn_props.is_pessimistic() {
return Err(ErrorInner::LockTypeNotMatch {
start_ts: self.txn_props.start_ts,
key: self.key.to_raw()?,
pessimistic: true,
}
.into());
}
self.lock_ttl = std::cmp::max(self.lock_ttl, lock.ttl);
self.min_commit_ts = std::cmp::max(self.min_commit_ts, lock.min_commit_ts);
return Ok(LockStatus::Pessimistic);
}
MVCC_DUPLICATE_CMD_COUNTER_VEC.prewrite.inc();
let min_commit_ts = if lock.use_async_commit {
lock.min_commit_ts
} else {
TimeStamp::zero()
};
Ok(LockStatus::Locked(min_commit_ts))
}
fn check_for_newer_version<S: Snapshot>(
&self,
reader: &mut SnapshotReader<S>,
) -> Result<Option<Write>> {
match reader.seek_write(&self.key, TimeStamp::max())? {
Some((commit_ts, write)) => {
if commit_ts > self.txn_props.start_ts {
MVCC_CONFLICT_COUNTER.prewrite_write_conflict.inc();
self.write_conflict_error(&write, commit_ts)?;
}
if commit_ts == self.txn_props.start_ts
&& (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
{
MVCC_CONFLICT_COUNTER.rolled_back.inc();
self.write_conflict_error(&write, commit_ts)?;
}
check_data_constraint(reader, self.should_not_exist, &write, commit_ts, &self.key)?;
Ok(Some(write))
}
None => Ok(None),
}
}
fn write_lock(self, lock_status: LockStatus, txn: &mut MvccTxn) -> Result<TimeStamp> {
let mut try_one_pc = self.try_one_pc();
let mut lock = Lock::new(
self.lock_type.unwrap(),
self.txn_props.primary.to_vec(),
self.txn_props.start_ts,
self.lock_ttl,
None,
self.txn_props.for_update_ts(),
self.txn_props.txn_size,
self.min_commit_ts,
);
if let Some(value) = self.value {
if is_short_value(&value) {
lock.short_value = Some(value);
} else {
txn.put_value(self.key.clone(), self.txn_props.start_ts, value);
}
}
if let Some(secondary_keys) = self.secondary_keys {
lock.use_async_commit = true;
lock.secondaries = secondary_keys.to_owned();
}
let final_min_commit_ts = if lock.use_async_commit || try_one_pc {
let res = async_commit_timestamps(
&self.key,
&mut lock,
self.txn_props.start_ts,
self.txn_props.for_update_ts(),
self.txn_props.max_commit_ts(),
txn,
);
fail_point!("after_calculate_min_commit_ts");
if let Err(Error(box ErrorInner::CommitTsTooLarge { .. })) = &res {
try_one_pc = false;
lock.use_async_commit = false;
lock.secondaries = Vec::new();
}
res
} else {
Ok(TimeStamp::zero())
};
if try_one_pc {
txn.put_locks_for_1pc(self.key, lock, lock_status.has_pessimistic_lock());
} else {
txn.put_lock(self.key, &lock);
}
final_min_commit_ts
}
fn write_conflict_error(&self, write: &Write, commit_ts: TimeStamp) -> Result<()> {
Err(ErrorInner::WriteConflict {
start_ts: self.txn_props.start_ts,
conflict_start_ts: write.start_ts,
conflict_commit_ts: commit_ts,
key: self.key.to_raw()?,
primary: self.txn_props.primary.to_vec(),
}
.into())
}
fn skip_constraint_check(&self) -> bool {
match &self.txn_props.kind {
TransactionKind::Optimistic(s) => *s,
TransactionKind::Pessimistic(_) => true,
}
}
fn need_min_commit_ts(&self) -> bool {
matches!(
&self.txn_props.commit_kind,
CommitKind::Async(_) | CommitKind::OnePc(_)
)
}
fn try_one_pc(&self) -> bool {
matches!(&self.txn_props.commit_kind, CommitKind::OnePc(_))
}
}
fn async_commit_timestamps(
key: &Key,
lock: &mut Lock,
start_ts: TimeStamp,
for_update_ts: TimeStamp,
max_commit_ts: TimeStamp,
txn: &mut MvccTxn,
) -> Result<TimeStamp> {
let key_guard = CONCURRENCY_MANAGER_LOCK_DURATION_HISTOGRAM.observe_closure_duration(|| {
::futures_executor::block_on(txn.concurrency_manager.lock_key(key))
});
let final_min_commit_ts = key_guard.with_lock(|l| {
let max_ts = txn.concurrency_manager.max_ts();
fail_point!("before-set-lock-in-memory");
let min_commit_ts = cmp::max(cmp::max(max_ts, start_ts), for_update_ts).next();
let min_commit_ts = cmp::max(lock.min_commit_ts, min_commit_ts);
#[cfg(feature = "failpoints")]
let injected_fallback = (|| {
fail_point!("async_commit_1pc_force_fallback", |_| {
info!("[failpoint] injected fallback for async commit/1pc transaction"; "start_ts" => start_ts);
true
});
false
})();
#[cfg(not(feature = "failpoints"))]
let injected_fallback = false;
let max_commit_ts = max_commit_ts;
if (!max_commit_ts.is_zero() && min_commit_ts > max_commit_ts) || injected_fallback {
warn!("commit_ts is too large, fallback to normal 2PC";
"key" => log_wrappers::Value::key(key.as_encoded()),
"start_ts" => start_ts,
"min_commit_ts" => min_commit_ts,
"max_commit_ts" => max_commit_ts,
"lock" => ?lock);
return Err(ErrorInner::CommitTsTooLarge {
start_ts,
min_commit_ts,
max_commit_ts,
});
}
lock.min_commit_ts = min_commit_ts;
*l = Some(lock.clone());
Ok(min_commit_ts)
})?;
txn.guards.push(key_guard);
Ok(final_min_commit_ts)
}
fn amend_pessimistic_lock<S: Snapshot>(key: &Key, reader: &mut SnapshotReader<S>) -> Result<()> {
if let Some((commit_ts, _)) = reader.seek_write(key, TimeStamp::max())? {
if commit_ts >= reader.start_ts {
warn!(
"prewrite failed (pessimistic lock not found)";
"start_ts" => reader.start_ts,
"commit_ts" => commit_ts,
"key" => %key
);
MVCC_CONFLICT_COUNTER
.pipelined_acquire_pessimistic_lock_amend_fail
.inc();
return Err(ErrorInner::PessimisticLockNotFound {
start_ts: reader.start_ts,
key: key.clone().into_raw()?,
}
.into());
}
}
MVCC_CONFLICT_COUNTER
.pipelined_acquire_pessimistic_lock_amend_success
.inc();
Ok(())
}
pub mod tests {
use super::*;
#[cfg(test)]
use crate::storage::{
kv::RocksSnapshot,
txn::{
commands::prewrite::fallback_1pc_locks,
tests::{
must_acquire_pessimistic_lock, must_cleanup_with_gc_fence, must_commit,
must_prewrite_delete, must_prewrite_lock, must_prewrite_put, must_rollback,
},
},
};
use crate::storage::{mvcc::tests::*, Engine};
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
#[cfg(test)]
use rand::{Rng, SeedableRng};
#[cfg(test)]
use std::sync::Arc;
#[cfg(test)]
use txn_types::OldValue;
fn optimistic_txn_props(primary: &[u8], start_ts: TimeStamp) -> TransactionProperties<'_> {
TransactionProperties {
start_ts,
kind: TransactionKind::Optimistic(false),
commit_kind: CommitKind::TwoPc,
primary,
txn_size: 0,
lock_ttl: 0,
min_commit_ts: TimeStamp::default(),
need_old_value: false,
}
}
#[cfg(test)]
fn optimistic_async_props(
primary: &[u8],
start_ts: TimeStamp,
max_commit_ts: TimeStamp,
txn_size: u64,
one_pc: bool,
) -> TransactionProperties<'_> {
TransactionProperties {
start_ts,
kind: TransactionKind::Optimistic(false),
commit_kind: if one_pc {
CommitKind::OnePc(max_commit_ts)
} else {
CommitKind::Async(max_commit_ts)
},
primary,
txn_size,
lock_ttl: 2000,
min_commit_ts: 10.into(),
need_old_value: true,
}
}
pub fn try_prewrite_insert<E: Engine>(
engine: &E,
key: &[u8],
value: &[u8],
pk: &[u8],
ts: impl Into<TimeStamp>,
) -> Result<()> {
let ctx = Context::default();
let snapshot = engine.snapshot(Default::default()).unwrap();
let ts = ts.into();
let cm = ConcurrencyManager::new(ts);
let mut txn = MvccTxn::new(ts, cm);
let mut reader = SnapshotReader::new(ts, snapshot, true);
let mut props = optimistic_txn_props(pk, ts);
props.need_old_value = true;
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&props,
Mutation::Insert((Key::from_raw(key), value.to_vec())),
&None,
false,
)?;
assert!(
matches!(old_value, OldValue::None | OldValue::Unspecified),
"{:?}",
old_value
);
write(engine, &ctx, txn.into_modifies());
Ok(())
}
pub fn try_prewrite_check_not_exists<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
ts: impl Into<TimeStamp>,
) -> Result<()> {
let snapshot = engine.snapshot(Default::default()).unwrap();
let ts = ts.into();
let cm = ConcurrencyManager::new(ts);
let mut txn = MvccTxn::new(ts, cm);
let mut reader = SnapshotReader::new(ts, snapshot, true);
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&optimistic_txn_props(pk, ts),
Mutation::CheckNotExists(Key::from_raw(key)),
&None,
true,
)?;
assert_eq!(old_value, OldValue::Unspecified);
Ok(())
}
#[test]
fn test_async_commit_prewrite_check_max_commit_ts() {
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let cm = ConcurrencyManager::new(42.into());
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut txn = MvccTxn::new(10.into(), cm.clone());
let mut reader = SnapshotReader::new(10.into(), snapshot, true);
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false),
Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())),
&Some(vec![b"k2".to_vec()]),
false,
)
.unwrap();
assert_eq!(old_value, OldValue::None);
cm.update_max_ts(60.into());
let err = prewrite(
&mut txn,
&mut reader,
&optimistic_async_props(b"k1", 10.into(), 50.into(), 1, false),
Mutation::Put((Key::from_raw(b"k2"), b"v2".to_vec())),
&Some(vec![]),
false,
)
.unwrap_err();
assert!(matches!(
err,
Error(box ErrorInner::CommitTsTooLarge { .. })
));
let modifies = txn.into_modifies();
assert_eq!(modifies.len(), 2);
write(&engine, &Default::default(), modifies);
assert!(must_locked(&engine, b"k1", 10).use_async_commit);
assert!(!must_locked(&engine, b"k2", 10).use_async_commit);
}
#[test]
fn test_async_commit_prewrite_min_commit_ts() {
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let cm = ConcurrencyManager::new(41.into());
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut props = optimistic_async_props(b"k0", 10.into(), 50.into(), 2, false);
props.min_commit_ts = 11.into();
let mut txn = MvccTxn::new(10.into(), cm.clone());
let mut reader = SnapshotReader::new(10.into(), snapshot.clone(), false);
let (min_ts, old_value) = prewrite(
&mut txn,
&mut reader,
&props,
Mutation::CheckNotExists(Key::from_raw(b"k0")),
&Some(vec![]),
false,
)
.unwrap();
assert!(min_ts > props.start_ts);
assert!(min_ts >= props.min_commit_ts);
assert!(min_ts < 41.into());
assert_eq!(old_value, OldValue::Unspecified);
let mut props = optimistic_txn_props(b"k0", 42.into());
props.min_commit_ts = 43.into();
let mut txn = MvccTxn::new(42.into(), cm.clone());
let mut reader = SnapshotReader::new(42.into(), snapshot.clone(), false);
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&props,
Mutation::CheckNotExists(Key::from_raw(b"k0")),
&Some(vec![]),
false,
)
.unwrap();
assert_eq!(cm.max_ts(), props.start_ts);
assert_eq!(old_value, OldValue::Unspecified);
let mut txn = MvccTxn::new(10.into(), cm.clone());
let mut reader = SnapshotReader::new(10.into(), snapshot.clone(), false);
let (min_ts, old_value) = prewrite(
&mut txn,
&mut reader,
&optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false),
Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())),
&Some(vec![b"k2".to_vec()]),
false,
)
.unwrap();
assert!(min_ts > 42.into());
assert!(min_ts < 50.into());
assert_eq!(old_value, OldValue::None);
for &should_not_write in &[false, true] {
let mutation = if should_not_write {
Mutation::CheckNotExists(Key::from_raw(b"k3"))
} else {
Mutation::Put((Key::from_raw(b"k3"), b"v1".to_vec()))
};
let mut txn = MvccTxn::new(44.into(), cm.clone());
let mut reader = SnapshotReader::new(44.into(), snapshot.clone(), false);
let (min_ts, old_value) = prewrite(
&mut txn,
&mut reader,
&optimistic_async_props(b"k3", 44.into(), 50.into(), 2, false),
mutation.clone(),
&Some(vec![b"k4".to_vec()]),
false,
)
.unwrap();
assert!(min_ts > 44.into());
assert!(min_ts < 50.into());
txn.take_guards();
if should_not_write {
assert_eq!(old_value, OldValue::Unspecified);
} else {
assert_eq!(old_value, OldValue::None);
}
if !should_not_write {
let mut props = optimistic_async_props(b"k5", 44.into(), 50.into(), 2, false);
props.kind = TransactionKind::Pessimistic(45.into());
let (min_ts, old_value) = prewrite(
&mut txn,
&mut reader,
&props,
mutation.clone(),
&Some(vec![b"k6".to_vec()]),
false,
)
.unwrap();
assert!(min_ts > 45.into());
assert!(min_ts < 50.into());
txn.take_guards();
assert_eq!(old_value, OldValue::Unspecified);
}
let mut props = optimistic_async_props(b"k7", 44.into(), 50.into(), 2, false);
props.min_commit_ts = 46.into();
let (min_ts, old_value) = prewrite(
&mut txn,
&mut reader,
&props,
mutation.clone(),
&Some(vec![b"k8".to_vec()]),
false,
)
.unwrap();
assert!(min_ts >= 46.into());
assert!(min_ts < 50.into());
txn.take_guards();
if should_not_write {
assert_eq!(old_value, OldValue::Unspecified);
} else {
assert_eq!(old_value, OldValue::None);
}
}
}
#[test]
fn test_1pc_check_max_commit_ts() {
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let cm = ConcurrencyManager::new(42.into());
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut txn = MvccTxn::new(10.into(), cm.clone());
let mut reader = SnapshotReader::new(10.into(), snapshot, false);
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&optimistic_async_props(b"k1", 10.into(), 50.into(), 2, true),
Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())),
&None,
false,
)
.unwrap();
assert_eq!(old_value, OldValue::None);
cm.update_max_ts(60.into());
let err = prewrite(
&mut txn,
&mut reader,
&optimistic_async_props(b"k1", 10.into(), 50.into(), 1, true),
Mutation::Put((Key::from_raw(b"k2"), b"v2".to_vec())),
&None,
false,
)
.unwrap_err();
assert!(matches!(
err,
Error(box ErrorInner::CommitTsTooLarge { .. })
));
fallback_1pc_locks(&mut txn);
let modifies = txn.into_modifies();
assert_eq!(modifies.len(), 2);
write(&engine, &Default::default(), modifies);
assert!(!must_locked(&engine, b"k1", 10).use_async_commit);
assert!(!must_locked(&engine, b"k2", 10).use_async_commit);
}
pub fn try_pessimistic_prewrite_check_not_exists<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
ts: impl Into<TimeStamp>,
) -> Result<()> {
let snapshot = engine.snapshot(Default::default()).unwrap();
let ts = ts.into();
let cm = ConcurrencyManager::new(ts);
let mut txn = MvccTxn::new(ts, cm);
let mut reader = SnapshotReader::new(ts, snapshot, false);
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&TransactionProperties {
start_ts: ts,
kind: TransactionKind::Pessimistic(TimeStamp::default()),
commit_kind: CommitKind::TwoPc,
primary: pk,
txn_size: 0,
lock_ttl: 0,
min_commit_ts: TimeStamp::default(),
need_old_value: true,
},
Mutation::CheckNotExists(Key::from_raw(key)),
&None,
false,
)?;
assert_eq!(old_value, OldValue::Unspecified);
Ok(())
}
#[test]
fn test_async_commit_pessimistic_prewrite_check_max_commit_ts() {
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let cm = ConcurrencyManager::new(42.into());
must_acquire_pessimistic_lock(&engine, b"k1", b"k1", 10, 10);
must_acquire_pessimistic_lock(&engine, b"k2", b"k1", 10, 10);
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut txn = MvccTxn::new(10.into(), cm.clone());
let mut reader = SnapshotReader::new(10.into(), snapshot, false);
let txn_props = TransactionProperties {
start_ts: 10.into(),
kind: TransactionKind::Pessimistic(20.into()),
commit_kind: CommitKind::Async(50.into()),
primary: b"k1",
txn_size: 2,
lock_ttl: 2000,
min_commit_ts: 10.into(),
need_old_value: true,
};
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())),
&Some(vec![b"k2".to_vec()]),
true,
)
.unwrap();
assert_eq!(old_value, OldValue::Unspecified);
cm.update_max_ts(60.into());
prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Put((Key::from_raw(b"k2"), b"v2".to_vec())),
&Some(vec![]),
true,
)
.unwrap_err();
}
#[test]
fn test_1pc_pessimistic_prewrite_check_max_commit_ts() {
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let cm = ConcurrencyManager::new(42.into());
must_acquire_pessimistic_lock(&engine, b"k1", b"k1", 10, 10);
must_acquire_pessimistic_lock(&engine, b"k2", b"k1", 10, 10);
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut txn = MvccTxn::new(10.into(), cm.clone());
let mut reader = SnapshotReader::new(10.into(), snapshot, false);
let txn_props = TransactionProperties {
start_ts: 10.into(),
kind: TransactionKind::Pessimistic(20.into()),
commit_kind: CommitKind::OnePc(50.into()),
primary: b"k1",
txn_size: 2,
lock_ttl: 2000,
min_commit_ts: 10.into(),
need_old_value: true,
};
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Put((Key::from_raw(b"k1"), b"v1".to_vec())),
&None,
true,
)
.unwrap();
assert_eq!(old_value, OldValue::Unspecified);
cm.update_max_ts(60.into());
prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Put((Key::from_raw(b"k2"), b"v2".to_vec())),
&None,
true,
)
.unwrap_err();
}
#[test]
fn test_prewrite_check_gc_fence() {
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let cm = ConcurrencyManager::new(1.into());
must_prewrite_put(&engine, b"k1", b"v1", b"k1", 10);
must_commit(&engine, b"k1", 10, 30);
must_cleanup_with_gc_fence(&engine, b"k1", 30, 0, 40, true);
must_prewrite_put(&engine, b"k2", b"v2", b"k2", 11);
must_commit(&engine, b"k2", 11, 30);
must_cleanup_with_gc_fence(&engine, b"k2", 30, 0, 0, true);
must_prewrite_put(&engine, b"k3", b"v3", b"k3", 12);
must_commit(&engine, b"k3", 12, 30);
must_prewrite_lock(&engine, b"k3", b"k3", 37);
must_commit(&engine, b"k3", 37, 38);
must_cleanup_with_gc_fence(&engine, b"k3", 30, 0, 40, true);
must_prewrite_lock(&engine, b"k3", b"k3", 42);
must_commit(&engine, b"k3", 42, 43);
must_prewrite_put(&engine, b"k4", b"v4", b"k4", 13);
must_commit(&engine, b"k4", 13, 30);
must_prewrite_lock(&engine, b"k4", b"k4", 37);
must_commit(&engine, b"k4", 37, 38);
must_prewrite_lock(&engine, b"k4", b"k4", 42);
must_commit(&engine, b"k4", 42, 43);
must_cleanup_with_gc_fence(&engine, b"k4", 30, 0, 0, true);
must_prewrite_put(&engine, b"k5", b"v5", b"k5", 14);
must_commit(&engine, b"k5", 14, 20);
must_prewrite_put(&engine, b"k5", b"v5x", b"k5", 21);
must_commit(&engine, b"k5", 21, 30);
must_cleanup_with_gc_fence(&engine, b"k5", 20, 0, 30, false);
must_cleanup_with_gc_fence(&engine, b"k5", 30, 0, 40, true);
must_prewrite_put(&engine, b"k6", b"v6", b"k6", 15);
must_commit(&engine, b"k6", 15, 20);
must_prewrite_put(&engine, b"k6", b"v6x", b"k6", 22);
must_commit(&engine, b"k6", 22, 30);
must_cleanup_with_gc_fence(&engine, b"k6", 20, 0, 30, false);
must_cleanup_with_gc_fence(&engine, b"k6", 30, 0, 0, true);
must_prewrite_put(&engine, b"k7", b"v7", b"k7", 16);
must_commit(&engine, b"k7", 16, 30);
must_prewrite_lock(&engine, b"k7", b"k7", 37);
must_commit(&engine, b"k7", 37, 38);
must_cleanup_with_gc_fence(&engine, b"k7", 30, 0, 40, true);
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut txn = MvccTxn::new(50.into(), cm.clone());
let mut reader = SnapshotReader::new(50.into(), snapshot.clone(), false);
let txn_props = TransactionProperties {
start_ts: 50.into(),
kind: TransactionKind::Optimistic(false),
commit_kind: CommitKind::TwoPc,
primary: b"k1",
txn_size: 6,
lock_ttl: 2000,
min_commit_ts: 51.into(),
need_old_value: true,
};
let cases = vec![
(b"k1", true),
(b"k2", false),
(b"k3", true),
(b"k4", false),
(b"k5", true),
(b"k6", false),
(b"k7", true),
];
for (key, success) in cases {
let res = prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::CheckNotExists(Key::from_raw(key)),
&None,
false,
);
if success {
let res = res.unwrap();
assert_eq!(res.1, OldValue::Unspecified);
} else {
res.unwrap_err();
}
let res = prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Insert((Key::from_raw(key), b"value".to_vec())),
&None,
false,
);
if success {
let res = res.unwrap();
assert_eq!(res.1, OldValue::None);
} else {
res.unwrap_err();
}
}
drop(txn);
let mut txn = MvccTxn::new(50.into(), cm);
let mut reader = SnapshotReader::new(50.into(), snapshot, false);
let txn_props = TransactionProperties {
start_ts: 50.into(),
kind: TransactionKind::Optimistic(false),
commit_kind: CommitKind::TwoPc,
primary: b"k1",
txn_size: 6,
lock_ttl: 2000,
min_commit_ts: 51.into(),
need_old_value: true,
};
let cases: Vec<_> = vec![
(b"k1" as &[u8], None),
(b"k2", Some(b"v2" as &[u8])),
(b"k3", None),
(b"k4", Some(b"v4")),
(b"k5", None),
(b"k6", Some(b"v6x")),
(b"k7", None),
]
.into_iter()
.map(|(k, v)| {
let old_value = v
.map(|value| OldValue::Value {
value: value.to_vec(),
})
.unwrap_or(OldValue::None);
(Key::from_raw(k), old_value)
})
.collect();
for (key, expected_value) in &cases {
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Put((key.clone(), b"value".to_vec())),
&None,
false,
)
.unwrap();
assert_eq!(&old_value, expected_value, "key: {}", key);
}
}
#[test]
fn test_old_value_rollback_and_lock() {
let engine_rollback = crate::storage::TestEngineBuilder::new().build().unwrap();
must_prewrite_put(&engine_rollback, b"k1", b"v1", b"k1", 10);
must_commit(&engine_rollback, b"k1", 10, 30);
must_prewrite_put(&engine_rollback, b"k1", b"v2", b"k1", 40);
must_rollback(&engine_rollback, b"k1", 40, false);
let engine_lock = crate::storage::TestEngineBuilder::new().build().unwrap();
must_prewrite_put(&engine_lock, b"k1", b"v1", b"k1", 10);
must_commit(&engine_lock, b"k1", 10, 30);
must_prewrite_lock(&engine_lock, b"k1", b"k1", 40);
must_commit(&engine_lock, b"k1", 40, 45);
for engine in &[engine_rollback, engine_lock] {
let start_ts = TimeStamp::from(50);
let txn_props = TransactionProperties {
start_ts,
kind: TransactionKind::Optimistic(false),
commit_kind: CommitKind::TwoPc,
primary: b"k1",
txn_size: 0,
lock_ttl: 0,
min_commit_ts: TimeStamp::default(),
need_old_value: true,
};
let snapshot = engine.snapshot(Default::default()).unwrap();
let cm = ConcurrencyManager::new(start_ts);
let mut txn = MvccTxn::new(start_ts, cm);
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Put((Key::from_raw(b"k1"), b"value".to_vec())),
&None,
false,
)
.unwrap();
assert_eq!(
old_value,
OldValue::Value {
value: b"v1".to_vec(),
}
);
}
}
#[cfg(test)]
pub fn old_value_put_delete_lock_insert<E: Engine>(engine: &E, key: &[u8]) -> TimeStamp {
must_prewrite_put(engine, key, b"v1", key, 10);
must_commit(engine, key, 10, 20);
must_prewrite_delete(engine, key, key, 30);
must_commit(engine, key, 30, 40);
must_prewrite_lock(engine, key, key, 50);
must_commit(engine, key, 50, 60);
70.into()
}
#[test]
fn test_old_value_put_delete_lock_insert() {
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let start_ts = old_value_put_delete_lock_insert(&engine, b"k1");
let txn_props = TransactionProperties {
start_ts,
kind: TransactionKind::Optimistic(false),
commit_kind: CommitKind::TwoPc,
primary: b"k1",
txn_size: 0,
lock_ttl: 0,
min_commit_ts: TimeStamp::default(),
need_old_value: true,
};
let snapshot = engine.snapshot(Default::default()).unwrap();
let cm = ConcurrencyManager::new(start_ts);
let mut txn = MvccTxn::new(start_ts, cm);
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Insert((Key::from_raw(b"k1"), b"v2".to_vec())),
&None,
false,
)
.unwrap();
assert_eq!(old_value, OldValue::None);
}
#[cfg(test)]
pub type OldValueRandomTest = Box<dyn Fn(Arc<RocksSnapshot>, TimeStamp) -> Result<OldValue>>;
#[cfg(test)]
pub fn old_value_random(
key: &[u8],
require_old_value_none: bool,
tests: Vec<OldValueRandomTest>,
) {
let mut ts = 1u64;
let mut tso = || {
ts += 1;
ts
};
use std::time::SystemTime;
let seed = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut rg = rand::rngs::StdRng::seed_from_u64(seed);
let engine = crate::storage::TestEngineBuilder::new().build().unwrap();
let cases = 1000;
for _ in 0..cases {
let ops_count = rg.gen::<u8>() % 12;
let ops = (0..ops_count)
.into_iter()
.enumerate()
.map(|(i, _)| {
if i == 0 {
0
} else {
rg.gen::<u8>() % 4
}
})
.collect::<Vec<_>>();
for (i, op) in ops.iter().enumerate() {
let start_ts = tso();
let commit_ts = tso();
match op {
0 => {
must_prewrite_put(&engine, key, &[i as u8], key, start_ts);
must_commit(&engine, key, start_ts, commit_ts);
}
1 => {
must_prewrite_delete(&engine, key, key, start_ts);
must_commit(&engine, key, start_ts, commit_ts);
}
2 => {
must_prewrite_lock(&engine, key, key, start_ts);
must_commit(&engine, key, start_ts, commit_ts);
}
3 => {
must_prewrite_put(&engine, key, &[i as u8], key, start_ts);
must_rollback(&engine, key, start_ts, false);
}
_ => unreachable!(),
}
}
let start_ts = TimeStamp::from(tso());
let snapshot = engine.snapshot(Default::default()).unwrap();
let expect = {
let mut reader = SnapshotReader::new(start_ts, snapshot.clone(), true);
if let Some(write) = reader
.reader
.get_write(&Key::from_raw(key), start_ts, Some(start_ts))
.unwrap()
{
assert_eq!(write.write_type, WriteType::Put);
match write.short_value {
Some(value) => OldValue::Value { value },
None => OldValue::ValueTimeStamp {
start_ts: write.start_ts,
},
}
} else {
OldValue::None
}
};
if require_old_value_none && expect != OldValue::None {
continue;
}
for test in &tests {
match test(snapshot.clone(), start_ts) {
Ok(old_value) => {
assert_eq!(old_value, expect, "seed: {} ops: {:?}", seed, ops);
}
Err(e) => {
panic!("error: {:?} seed: {} ops: {:?}", e, seed, ops);
}
}
}
}
}
#[test]
fn test_old_value_random() {
let key = b"k1";
let require_old_value_none = false;
old_value_random(
key,
require_old_value_none,
vec![Box::new(move |snapshot, start_ts| {
let cm = ConcurrencyManager::new(start_ts);
let mut txn = MvccTxn::new(start_ts, cm);
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
let txn_props = TransactionProperties {
start_ts,
kind: TransactionKind::Optimistic(false),
commit_kind: CommitKind::TwoPc,
primary: key,
txn_size: 0,
lock_ttl: 0,
min_commit_ts: TimeStamp::default(),
need_old_value: true,
};
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Put((Key::from_raw(key), b"v2".to_vec())),
&None,
false,
)?;
Ok(old_value)
})],
)
}
#[test]
fn test_old_value_random_none() {
let key = b"k1";
let require_old_value_none = true;
old_value_random(
key,
require_old_value_none,
vec![Box::new(move |snapshot, start_ts| {
let cm = ConcurrencyManager::new(start_ts);
let mut txn = MvccTxn::new(start_ts, cm);
let mut reader = SnapshotReader::new(start_ts, snapshot, true);
let txn_props = TransactionProperties {
start_ts,
kind: TransactionKind::Optimistic(false),
commit_kind: CommitKind::TwoPc,
primary: key,
txn_size: 0,
lock_ttl: 0,
min_commit_ts: TimeStamp::default(),
need_old_value: true,
};
let (_, old_value) = prewrite(
&mut txn,
&mut reader,
&txn_props,
Mutation::Insert((Key::from_raw(key), b"v2".to_vec())),
&None,
false,
)?;
Ok(old_value)
})],
)
}
}