use crate::storage::kv::WriteData;
use crate::storage::lock_manager::LockManager;
use crate::storage::mvcc::{LockType, MvccTxn, SnapshotReader, TimeStamp, TxnCommitRecord};
use crate::storage::txn::{
actions::check_txn_status::{collapse_prev_rollback, make_rollback},
commands::{
Command, CommandExt, ReleasedLocks, ResponsePolicy, TypedCommand, WriteCommand,
WriteContext, WriteResult,
},
Result,
};
use crate::storage::types::SecondaryLocksStatus;
use crate::storage::{ProcessResult, Snapshot};
use txn_types::{Key, Lock, WriteType};
command! {
CheckSecondaryLocks:
cmd_ty => SecondaryLocksStatus,
display => "kv::command::CheckSecondaryLocks {} keys@{} | {:?}", (keys.len, start_ts, ctx),
content => {
keys: Vec<Key>,
start_ts: txn_types::TimeStamp,
}
}
impl CommandExt for CheckSecondaryLocks {
ctx!();
tag!(check_secondary_locks);
ts!(start_ts);
write_bytes!(keys: multiple);
gen_lock!(keys: multiple);
}
#[derive(Debug, PartialEq)]
enum SecondaryLockStatus {
Locked(Lock),
Committed(TimeStamp),
RolledBack,
}
impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckSecondaryLocks {
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
context.concurrency_manager.update_max_ts(self.start_ts);
let mut txn = MvccTxn::new(self.start_ts, context.concurrency_manager);
let mut reader =
SnapshotReader::new(self.start_ts, snapshot, !self.ctx.get_not_fill_cache());
let mut released_locks = ReleasedLocks::new(self.start_ts, TimeStamp::zero());
let mut result = SecondaryLocksStatus::Locked(Vec::new());
for key in self.keys {
let mut released_lock = None;
let mut mismatch_lock = None;
let (status, need_rollback, rollback_overlapped_write) = match reader.load_lock(&key)? {
Some(lock) if lock.ts == self.start_ts => {
if lock.lock_type == LockType::Pessimistic {
released_lock = txn.unlock_key(key.clone(), true);
let overlapped_write = reader.get_txn_commit_record(&key)?.unwrap_none();
(SecondaryLockStatus::RolledBack, true, overlapped_write)
} else {
(SecondaryLockStatus::Locked(lock), false, None)
}
}
l => {
mismatch_lock = l;
match reader.get_txn_commit_record(&key)? {
TxnCommitRecord::SingleRecord { commit_ts, write } => {
let status = if write.write_type != WriteType::Rollback {
SecondaryLockStatus::Committed(commit_ts)
} else {
SecondaryLockStatus::RolledBack
};
(status, false, None)
}
TxnCommitRecord::OverlappedRollback { .. } => {
(SecondaryLockStatus::RolledBack, false, None)
}
TxnCommitRecord::None { overlapped_write } => {
(SecondaryLockStatus::RolledBack, true, overlapped_write)
}
}
}
};
if need_rollback {
if let Some(l) = mismatch_lock {
txn.mark_rollback_on_mismatching_lock(&key, l, true);
}
if let Some(write) = make_rollback(self.start_ts, true, rollback_overlapped_write) {
txn.put_write(key.clone(), self.start_ts, write.as_ref().to_bytes());
collapse_prev_rollback(&mut txn, &mut reader, &key)?;
}
}
released_locks.push(released_lock);
match status {
SecondaryLockStatus::Locked(lock) => {
result.push(lock.into_lock_info(key.to_raw()?));
}
SecondaryLockStatus::Committed(commit_ts) => {
result = SecondaryLocksStatus::Committed(commit_ts);
break;
}
SecondaryLockStatus::RolledBack => {
result = SecondaryLocksStatus::RolledBack;
break;
}
}
}
let mut rows = 0;
if let SecondaryLocksStatus::RolledBack = &result {
released_locks.wake_up(context.lock_mgr);
rows = 1;
}
context.statistics.add(&reader.take_statistics());
let pr = ProcessResult::SecondaryLocksStatus { status: result };
let write_data = WriteData::from_modifies(txn.into_modifies());
Ok(WriteResult {
ctx: self.ctx,
to_be_write: write_data,
rows,
pr,
lock_info: None,
lock_guards: vec![],
response_policy: ResponsePolicy::OnApplied,
})
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::storage::kv::TestEngineBuilder;
use crate::storage::lock_manager::DummyLockManager;
use crate::storage::mvcc::tests::*;
use crate::storage::txn::commands::WriteCommand;
use crate::storage::txn::tests::*;
use crate::storage::Engine;
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
pub fn must_success<E: Engine>(
engine: &E,
key: &[u8],
lock_ts: impl Into<TimeStamp>,
expect_status: SecondaryLocksStatus,
) {
let ctx = Context::default();
let snapshot = engine.snapshot(Default::default()).unwrap();
let lock_ts = lock_ts.into();
let cm = ConcurrencyManager::new(lock_ts);
let command = crate::storage::txn::commands::CheckSecondaryLocks {
ctx: ctx.clone(),
keys: vec![Key::from_raw(key)],
start_ts: lock_ts,
};
let result = command
.process_write(
snapshot,
WriteContext {
lock_mgr: &DummyLockManager,
concurrency_manager: cm,
extra_op: Default::default(),
statistics: &mut Default::default(),
async_apply_prewrite: false,
},
)
.unwrap();
if let ProcessResult::SecondaryLocksStatus { status } = result.pr {
assert_eq!(status, expect_status);
write(engine, &ctx, result.to_be_write.modifies);
} else {
unreachable!();
}
}
#[test]
fn test_check_async_commit_secondary_locks() {
let engine = TestEngineBuilder::new().build().unwrap();
let ctx = Context::default();
let cm = ConcurrencyManager::new(1.into());
let check_secondary = |key, ts| {
let snapshot = engine.snapshot(Default::default()).unwrap();
let key = Key::from_raw(key);
let ts = TimeStamp::new(ts);
let command = crate::storage::txn::commands::CheckSecondaryLocks {
ctx: Default::default(),
keys: vec![key],
start_ts: ts,
};
let result = command
.process_write(
snapshot,
WriteContext {
lock_mgr: &DummyLockManager,
concurrency_manager: cm.clone(),
extra_op: Default::default(),
statistics: &mut Default::default(),
async_apply_prewrite: false,
},
)
.unwrap();
if !result.to_be_write.modifies.is_empty() {
engine.write(&ctx, result.to_be_write).unwrap();
}
if let ProcessResult::SecondaryLocksStatus { status } = result.pr {
status
} else {
unreachable!();
}
};
must_prewrite_lock(&engine, b"k1", b"key", 1);
must_commit(&engine, b"k1", 1, 3);
must_rollback(&engine, b"k1", 5, false);
must_prewrite_lock(&engine, b"k1", b"key", 7);
must_commit(&engine, b"k1", 7, 9);
assert_eq!(
check_secondary(b"k1", 7),
SecondaryLocksStatus::Committed(9.into())
);
must_get_commit_ts(&engine, b"k1", 7, 9);
assert_eq!(check_secondary(b"k1", 5), SecondaryLocksStatus::RolledBack);
must_get_rollback_ts(&engine, b"k1", 5);
assert_eq!(
check_secondary(b"k1", 1),
SecondaryLocksStatus::Committed(3.into())
);
must_get_commit_ts(&engine, b"k1", 1, 3);
assert_eq!(check_secondary(b"k1", 6), SecondaryLocksStatus::RolledBack);
must_get_rollback_protected(&engine, b"k1", 6, true);
must_acquire_pessimistic_lock(&engine, b"k1", b"key", 11, 11);
let status = check_secondary(b"k1", 11);
assert_eq!(status, SecondaryLocksStatus::RolledBack);
must_get_rollback_protected(&engine, b"k1", 11, true);
must_prewrite_lock(&engine, b"k1", b"key", 13);
match check_secondary(b"k1", 13) {
SecondaryLocksStatus::Locked(_) => {}
res => panic!("unexpected lock status: {:?}", res),
}
must_locked(&engine, b"k1", 13);
must_commit(&engine, b"k1", 13, 15);
match check_secondary(b"k1", 14) {
SecondaryLocksStatus::RolledBack => {}
res => panic!("unexpected lock status: {:?}", res),
}
must_get_rollback_protected(&engine, b"k1", 14, true);
match check_secondary(b"k1", 15) {
SecondaryLocksStatus::RolledBack => {}
res => panic!("unexpected lock status: {:?}", res),
}
must_get_overlapped_rollback(&engine, b"k1", 15, 13, WriteType::Lock, Some(0));
}
}