use txn_types::{Key, TimeStamp};
use crate::storage::kv::WriteData;
use crate::storage::lock_manager::LockManager;
use crate::storage::mvcc::{MvccTxn, SnapshotReader};
use crate::storage::txn::actions::check_txn_status::*;
use crate::storage::txn::commands::{
Command, CommandExt, ReleasedLocks, ResponsePolicy, TypedCommand, WriteCommand, WriteContext,
WriteResult,
};
use crate::storage::txn::Result;
use crate::storage::{ProcessResult, Snapshot, TxnStatus};
command! {
CheckTxnStatus:
cmd_ty => TxnStatus,
display => "kv::command::check_txn_status {} @ {} curr({}, {}) | {:?}", (primary_key, lock_ts, caller_start_ts, current_ts, ctx),
content => {
primary_key: Key,
lock_ts: TimeStamp,
caller_start_ts: TimeStamp,
current_ts: TimeStamp,
rollback_if_not_exist: bool,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
}
}
impl CommandExt for CheckTxnStatus {
ctx!();
tag!(check_txn_status);
ts!(lock_ts);
write_bytes!(primary_key);
gen_lock!(primary_key);
}
impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckTxnStatus {
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
let mut new_max_ts = self.lock_ts;
if !self.current_ts.is_max() && self.current_ts > new_max_ts {
new_max_ts = self.current_ts;
}
if !self.caller_start_ts.is_max() && self.caller_start_ts > new_max_ts {
new_max_ts = self.caller_start_ts;
}
context.concurrency_manager.update_max_ts(new_max_ts);
let mut txn = MvccTxn::new(self.lock_ts, context.concurrency_manager);
let mut reader =
SnapshotReader::new(self.lock_ts, snapshot, !self.ctx.get_not_fill_cache());
fail_point!("check_txn_status", |err| Err(
crate::storage::mvcc::Error::from(crate::storage::mvcc::txn::make_txn_error(
err,
&self.primary_key,
self.lock_ts
))
.into()
));
let (txn_status, released) = match reader.load_lock(&self.primary_key)? {
Some(lock) if lock.ts == self.lock_ts => check_txn_status_lock_exists(
&mut txn,
&mut reader,
self.primary_key,
lock,
self.current_ts,
self.caller_start_ts,
self.force_sync_commit,
self.resolving_pessimistic_lock,
)?,
l => (
check_txn_status_missing_lock(
&mut txn,
&mut reader,
self.primary_key,
l,
MissingLockAction::rollback(self.rollback_if_not_exist),
self.resolving_pessimistic_lock,
)?,
None,
),
};
let mut released_locks = ReleasedLocks::new(self.lock_ts, TimeStamp::zero());
released_locks.push(released);
if let TxnStatus::TtlExpire = txn_status {
released_locks.wake_up(context.lock_mgr);
}
context.statistics.add(&reader.take_statistics());
let pr = ProcessResult::TxnStatus { txn_status };
let write_data = WriteData::from_modifies(txn.into_modifies());
Ok(WriteResult {
ctx: self.ctx,
to_be_write: write_data,
rows: 1,
pr,
lock_info: None,
lock_guards: vec![],
response_policy: ResponsePolicy::OnApplied,
})
}
}
#[cfg(test)]
pub mod tests {
use super::TxnStatus::*;
use super::*;
use crate::storage::kv::Engine;
use crate::storage::lock_manager::DummyLockManager;
use crate::storage::mvcc::tests::*;
use crate::storage::txn::commands::{pessimistic_rollback, WriteCommand, WriteContext};
use crate::storage::txn::tests::*;
use crate::storage::{types::TxnStatus, ProcessResult, TestEngineBuilder};
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
use txn_types::Key;
use txn_types::WriteType;
pub fn must_success<E: Engine>(
engine: &E,
primary_key: &[u8],
lock_ts: impl Into<TimeStamp>,
caller_start_ts: impl Into<TimeStamp>,
current_ts: impl Into<TimeStamp>,
rollback_if_not_exist: bool,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
status_pred: impl FnOnce(TxnStatus) -> bool,
) {
let ctx = Context::default();
let snapshot = engine.snapshot(Default::default()).unwrap();
let current_ts = current_ts.into();
let cm = ConcurrencyManager::new(current_ts);
let lock_ts: TimeStamp = lock_ts.into();
let command = crate::storage::txn::commands::CheckTxnStatus {
ctx: Context::default(),
primary_key: Key::from_raw(primary_key),
lock_ts,
caller_start_ts: caller_start_ts.into(),
current_ts,
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
};
let result = command
.process_write(
snapshot,
WriteContext {
lock_mgr: &DummyLockManager,
concurrency_manager: cm,
extra_op: Default::default(),
statistics: &mut Default::default(),
async_apply_prewrite: false,
},
)
.unwrap();
if let ProcessResult::TxnStatus { txn_status } = result.pr {
assert!(status_pred(txn_status));
} else {
unreachable!();
}
write(engine, &ctx, result.to_be_write.modifies);
}
pub fn must_err<E: Engine>(
engine: &E,
primary_key: &[u8],
lock_ts: impl Into<TimeStamp>,
caller_start_ts: impl Into<TimeStamp>,
current_ts: impl Into<TimeStamp>,
rollback_if_not_exist: bool,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
) {
let ctx = Context::default();
let snapshot = engine.snapshot(Default::default()).unwrap();
let current_ts = current_ts.into();
let cm = ConcurrencyManager::new(current_ts);
let lock_ts: TimeStamp = lock_ts.into();
let command = crate::storage::txn::commands::CheckTxnStatus {
ctx,
primary_key: Key::from_raw(primary_key),
lock_ts,
caller_start_ts: caller_start_ts.into(),
current_ts,
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
};
assert!(
command
.process_write(
snapshot,
WriteContext {
lock_mgr: &DummyLockManager,
concurrency_manager: cm,
extra_op: Default::default(),
statistics: &mut Default::default(),
async_apply_prewrite: false,
},
)
.is_err()
);
}
fn committed(commit_ts: impl Into<TimeStamp>) -> impl FnOnce(TxnStatus) -> bool {
move |s| {
s == TxnStatus::Committed {
commit_ts: commit_ts.into(),
}
}
}
fn uncommitted(
ttl: u64,
min_commit_ts: impl Into<TimeStamp>,
should_be_pushed: bool,
) -> impl FnOnce(TxnStatus) -> bool {
move |s| {
if let TxnStatus::Uncommitted {
lock,
min_commit_ts_pushed,
} = s
{
lock.ttl == ttl
&& lock.min_commit_ts == min_commit_ts.into()
&& min_commit_ts_pushed == should_be_pushed
} else {
false
}
}
}
#[test]
fn test_check_async_commit_txn_status() {
let do_test = |rollback_if_not_exist: bool| {
let engine = TestEngineBuilder::new().build().unwrap();
let r = rollback_if_not_exist;
must_prewrite_put_async_commit(&engine, b"k1", b"v", b"k1", &Some(vec![]), 1, 2);
must_success(
&engine,
b"k1",
1,
0,
0,
r,
false,
false,
uncommitted(100, 2, false),
);
must_success(
&engine,
b"k1",
1,
5,
0,
r,
false,
false,
uncommitted(100, 2, false),
);
must_success(
&engine,
b"k1",
1,
0,
8,
r,
false,
false,
uncommitted(100, 2, false),
);
must_success(
&engine,
b"k1",
1,
10,
12,
r,
false,
false,
uncommitted(100, 2, false),
);
must_success(
&engine,
b"k1",
1,
TimeStamp::max(),
12,
r,
false,
false,
uncommitted(100, 2, false),
);
must_success(
&engine,
b"k1",
1,
12,
TimeStamp::max(),
r,
false,
false,
uncommitted(100, 2, false),
);
must_success(
&engine,
b"k1",
1,
12,
TimeStamp::max(),
r,
true,
false,
|s| s == TtlExpire,
);
must_unlocked(&engine, b"k1");
must_get_rollback_protected(&engine, b"k1", 1, false);
must_acquire_pessimistic_lock(&engine, b"k2", b"k2", 15, 15);
must_pessimistic_prewrite_put_async_commit(
&engine,
b"k2",
b"v",
b"k2",
&Some(vec![]),
15,
16,
true,
17,
);
must_success(
&engine,
b"k2",
15,
0,
0,
r,
false,
false,
uncommitted(100, 17, false),
);
must_success(
&engine,
b"k2",
15,
18,
0,
r,
false,
false,
uncommitted(100, 17, false),
);
must_success(
&engine,
b"k2",
15,
0,
18,
r,
false,
false,
uncommitted(100, 17, false),
);
must_success(
&engine,
b"k2",
15,
19,
20,
r,
false,
false,
uncommitted(100, 17, false),
);
must_success(
&engine,
b"k2",
15,
TimeStamp::max(),
20,
r,
false,
false,
uncommitted(100, 17, false),
);
must_success(
&engine,
b"k2",
15,
20,
TimeStamp::max(),
r,
false,
false,
uncommitted(100, 17, false),
);
must_success(
&engine,
b"k2",
15,
20,
TimeStamp::max(),
r,
true,
false,
|s| s == TtlExpire,
);
must_unlocked(&engine, b"k2");
must_get_rollback_protected(&engine, b"k2", 15, true);
must_acquire_pessimistic_lock_for_large_txn(&engine, b"k3", b"k3", 20, 20, 100);
must_acquire_pessimistic_lock_for_large_txn(&engine, b"k4", b"k3", 20, 25, 100);
must_pessimistic_prewrite_put_async_commit(
&engine,
b"k4",
b"v",
b"k3",
&Some(vec![]),
20,
25,
true,
28,
);
must_success(
&engine,
b"k3",
20,
0,
0,
r,
false,
false,
uncommitted(100, 21, false),
);
must_acquire_pessimistic_lock_with_ttl(&engine, b"k5", b"k5", 30, 30, 100);
must_acquire_pessimistic_lock_with_ttl(&engine, b"k6", b"k5", 30, 35, 100);
must_pessimistic_prewrite_put_async_commit(
&engine,
b"k6",
b"v",
b"k5",
&Some(vec![]),
30,
35,
true,
36,
);
must_success(
&engine,
b"k5",
30,
0,
0,
r,
false,
false,
uncommitted(100, 0, false),
);
};
do_test(true);
do_test(false);
}
fn test_check_txn_status_impl(rollback_if_not_exist: bool) {
let engine = TestEngineBuilder::new().build().unwrap();
let (k, v) = (b"k1", b"v1");
let r = rollback_if_not_exist;
let ts = TimeStamp::compose;
if r {
must_success(
&engine,
k,
ts(3, 0),
ts(3, 1),
ts(3, 2),
r,
false,
false,
|s| s == LockNotExist,
);
must_get_rollback_protected(&engine, k, ts(3, 0), true);
} else {
must_err(&engine, k, ts(3, 0), ts(3, 1), ts(3, 2), r, false, false);
}
must_prewrite_put_for_large_txn(&engine, k, v, k, ts(5, 0), 100, 0);
must_large_txn_locked(&engine, k, ts(5, 0), 100, ts(5, 1), false);
must_success(
&engine,
k,
ts(5, 0),
0,
0,
r,
false,
false,
uncommitted(100, ts(5, 1), false),
);
must_success(
&engine,
k,
ts(5, 0),
ts(6, 0),
ts(7, 0),
r,
false,
false,
uncommitted(100, ts(7, 0), true),
);
must_large_txn_locked(&engine, k, ts(5, 0), 100, ts(7, 0), false);
must_success(
&engine,
k,
ts(5, 0),
ts(9, 0),
ts(8, 0),
r,
false,
false,
uncommitted(100, ts(9, 1), true),
);
must_large_txn_locked(&engine, k, ts(5, 0), 100, ts(9, 1), false);
must_success(
&engine,
k,
ts(5, 0),
ts(8, 0),
ts(10, 0),
r,
false,
false,
uncommitted(100, ts(9, 1), true),
);
must_large_txn_locked(&engine, k, ts(5, 0), 100, ts(9, 1), false);
must_success(
&engine,
k,
ts(5, 0),
ts(11, 0),
ts(9, 0),
r,
false,
false,
uncommitted(100, ts(11, 1), true),
);
must_large_txn_locked(&engine, k, ts(5, 0), 100, ts(11, 1), false);
must_success(
&engine,
k,
ts(5, 0),
ts(12, 0),
ts(12, 0),
r,
false,
false,
uncommitted(100, ts(12, 1), true),
);
must_large_txn_locked(&engine, k, ts(5, 0), 100, ts(12, 1), false);
must_success(
&engine,
k,
ts(5, 0),
ts(13, 1),
ts(13, 3),
r,
false,
false,
uncommitted(100, ts(13, 3), true),
);
must_large_txn_locked(&engine, k, ts(5, 0), 100, ts(13, 3), false);
must_commit(&engine, k, ts(5, 0), ts(15, 0));
must_unlocked(&engine, k);
must_success(
&engine,
k,
ts(5, 0),
ts(12, 0),
ts(12, 0),
r,
false,
false,
committed(ts(15, 0)),
);
must_unlocked(&engine, k);
must_prewrite_put_for_large_txn(&engine, k, v, k, ts(20, 0), 100, 0);
must_success(
&engine,
k,
ts(5, 0),
ts(12, 0),
ts(12, 0),
r,
false,
false,
committed(ts(15, 0)),
);
if r {
must_success(
&engine,
k,
ts(6, 0),
ts(12, 0),
ts(12, 0),
r,
false,
false,
|s| s == LockNotExist,
);
must_seek_write(
&engine,
k,
ts(6, 0),
ts(6, 0),
ts(6, 0),
WriteType::Rollback,
);
} else {
must_err(&engine, k, ts(6, 0), ts(12, 0), ts(12, 0), r, false, false);
}
must_success(
&engine,
k,
ts(20, 0),
ts(21, 105),
ts(21, 105),
r,
false,
false,
uncommitted(100, ts(21, 106), true),
);
must_large_txn_locked(&engine, k, ts(20, 0), 100, ts(21, 106), false);
must_success(
&engine,
k,
ts(20, 0),
ts(121, 0),
ts(121, 0),
r,
false,
false,
|s| s == TtlExpire,
);
must_unlocked(&engine, k);
must_seek_write(
&engine,
k,
TimeStamp::max(),
ts(20, 0),
ts(20, 0),
WriteType::Rollback,
);
must_acquire_pessimistic_lock_for_large_txn(&engine, k, k, ts(4, 0), ts(130, 0), 200);
must_large_txn_locked(&engine, k, ts(4, 0), 200, ts(130, 1), true);
must_success(
&engine,
k,
ts(4, 0),
ts(135, 0),
ts(135, 0),
r,
false,
false,
uncommitted(200, ts(135, 1), true),
);
must_large_txn_locked(&engine, k, ts(4, 0), 200, ts(135, 1), true);
must_pessimistic_prewrite_put(&engine, k, v, k, ts(4, 0), ts(130, 0), true);
must_commit(&engine, k, ts(4, 0), ts(140, 0));
must_unlocked(&engine, k);
must_get_commit_ts(&engine, k, ts(4, 0), ts(140, 0));
must_success(
&engine,
k,
ts(4, 0),
ts(10, 0),
ts(10, 0),
r,
false,
false,
committed(ts(140, 0)),
);
must_success(
&engine,
k,
ts(5, 0),
ts(10, 0),
ts(10, 0),
r,
false,
false,
committed(ts(15, 0)),
);
must_success(
&engine,
k,
ts(20, 0),
ts(10, 0),
ts(10, 0),
r,
false,
false,
|s| s == RolledBack,
);
must_acquire_pessimistic_lock_for_large_txn(&engine, k, k, ts(150, 0), ts(150, 0), 100);
must_success(
&engine,
k,
ts(150, 0),
ts(160, 0),
ts(160, 0),
r,
false,
false,
uncommitted(100, ts(160, 1), true),
);
must_large_txn_locked(&engine, k, ts(150, 0), 100, ts(160, 1), true);
must_success(
&engine,
k,
ts(150, 0),
ts(160, 0),
ts(260, 0),
r,
false,
false,
|s| s == TtlExpire,
);
must_unlocked(&engine, k);
must_seek_write(
&engine,
k,
TimeStamp::max(),
ts(150, 0),
ts(150, 0),
WriteType::Rollback,
);
must_prewrite_put_for_large_txn(&engine, k, v, k, ts(270, 0), 100, 0);
must_large_txn_locked(&engine, k, ts(270, 0), 100, ts(270, 1), false);
must_success(
&engine,
k,
ts(270, 0),
ts(271, 0),
TimeStamp::max(),
r,
false,
false,
|s| s == TtlExpire,
);
must_unlocked(&engine, k);
must_seek_write(
&engine,
k,
TimeStamp::max(),
ts(270, 0),
ts(270, 0),
WriteType::Rollback,
);
must_acquire_pessimistic_lock_for_large_txn(&engine, k, k, ts(280, 0), ts(280, 0), 100);
must_large_txn_locked(&engine, k, ts(280, 0), 100, ts(280, 1), true);
must_success(
&engine,
k,
ts(280, 0),
ts(281, 0),
TimeStamp::max(),
r,
false,
false,
|s| s == TtlExpire,
);
must_unlocked(&engine, k);
must_seek_write(
&engine,
k,
TimeStamp::max(),
ts(280, 0),
ts(280, 0),
WriteType::Rollback,
);
must_acquire_pessimistic_lock_with_ttl(&engine, k, k, ts(290, 0), ts(290, 0), 100);
must_success(
&engine,
k,
ts(290, 0),
ts(300, 0),
ts(300, 0),
r,
false,
false,
uncommitted(100, TimeStamp::zero(), false),
);
must_large_txn_locked(&engine, k, ts(290, 0), 100, TimeStamp::zero(), true);
pessimistic_rollback::tests::must_success(&engine, k, ts(290, 0), ts(290, 0));
must_prewrite_put_impl(
&engine,
k,
v,
k,
&None,
ts(300, 0),
false,
100,
TimeStamp::zero(),
1,
TimeStamp::zero(),
TimeStamp::zero(),
);
must_success(
&engine,
k,
ts(300, 0),
ts(310, 0),
ts(310, 0),
r,
false,
false,
uncommitted(100, TimeStamp::zero(), false),
);
must_large_txn_locked(&engine, k, ts(300, 0), 100, TimeStamp::zero(), false);
must_rollback(&engine, k, ts(300, 0), false);
must_prewrite_put_for_large_txn(&engine, k, v, k, ts(310, 0), 100, 0);
must_large_txn_locked(&engine, k, ts(310, 0), 100, ts(310, 1), false);
must_success(
&engine,
k,
ts(310, 0),
TimeStamp::max(),
ts(320, 0),
r,
false,
false,
uncommitted(100, ts(310, 1), true),
);
must_commit(&engine, k, ts(310, 0), ts(315, 0));
must_success(
&engine,
k,
ts(310, 0),
TimeStamp::max(),
ts(320, 0),
r,
false,
false,
committed(ts(315, 0)),
);
}
#[test]
fn test_check_txn_status() {
test_check_txn_status_impl(false);
test_check_txn_status_impl(true);
}
#[test]
fn test_check_txn_status_resolving_pessimistic_lock() {
let engine = TestEngineBuilder::new().build().unwrap();
let k = b"k1";
let v = b"v1";
let ts = TimeStamp::compose;
must_success(
&engine,
k,
ts(3, 0),
ts(3, 0),
ts(4, 0),
true,
false,
true,
|s| s == LockNotExistDoNothing,
);
must_get_rollback_ts_none(&engine, k, ts(5, 0));
must_err(&engine, k, ts(3, 0), ts(5, 0), ts(5, 0), false, false, true);
must_acquire_pessimistic_lock_with_ttl(&engine, k, k, ts(10, 0), ts(10, 0), 10);
must_pessimistic_locked(&engine, k, ts(10, 0), ts(10, 0));
must_success(
&engine,
k,
ts(10, 0),
ts(11, 0),
ts(11, 0),
true,
false,
true,
uncommitted(10, TimeStamp::zero(), false),
);
must_success(
&engine,
k,
ts(10, 0),
ts(21, 0),
ts(21, 0),
true,
false,
true,
|s| s == PessimisticRollBack,
);
must_unlocked(&engine, k);
must_get_rollback_ts_none(&engine, k, ts(22, 0));
must_prewrite_put_impl(
&engine,
k,
v,
k,
&None,
ts(30, 0),
false,
10,
TimeStamp::zero(),
1,
TimeStamp::zero(),
TimeStamp::zero(),
);
must_success(
&engine,
k,
ts(30, 0),
ts(31, 0),
ts(31, 0),
true,
false,
true,
uncommitted(10, TimeStamp::zero(), false),
);
must_success(
&engine,
k,
ts(30, 0),
ts(41, 0),
ts(41, 0),
true,
false,
true,
|s| s == TtlExpire,
);
must_unlocked(&engine, k);
must_get_rollback_ts(&engine, k, ts(30, 0));
must_acquire_pessimistic_lock_with_ttl(&engine, k, k, ts(50, 0), ts(50, 0), 10);
must_pessimistic_locked(&engine, k, ts(50, 0), ts(50, 0));
must_success(
&engine,
k,
ts(50, 0),
ts(61, 0),
ts(61, 0),
true,
false,
false,
|s| s == TtlExpire,
);
must_unlocked(&engine, k);
must_get_rollback_ts(&engine, k, ts(50, 0));
}
}