use crate::storage::{
mvcc::{
metrics::MVCC_CHECK_TXN_STATUS_COUNTER_VEC, reader::OverlappedWrite, ErrorInner, LockType,
MvccTxn, ReleasedLock, Result, SnapshotReader, TxnCommitRecord,
},
Snapshot, TxnStatus,
};
use txn_types::{Key, Lock, TimeStamp, Write, WriteType};
pub fn check_txn_status_lock_exists(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
primary_key: Key,
mut lock: Lock,
current_ts: TimeStamp,
caller_start_ts: TimeStamp,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
if lock.use_async_commit {
if force_sync_commit {
info!(
"fallback is set, check_txn_status treats it as a non-async-commit txn";
"start_ts" => reader.start_ts,
"primary_key" => ?primary_key,
);
} else {
return Ok((TxnStatus::uncommitted(lock, false), None));
}
}
let is_pessimistic_txn = !lock.for_update_ts.is_zero();
if lock.ts.physical() + lock.ttl < current_ts.physical() {
return if resolving_pessimistic_lock && lock.lock_type == LockType::Pessimistic {
let released = txn.unlock_key(primary_key, is_pessimistic_txn);
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.pessimistic_rollback.inc();
Ok((TxnStatus::PessimisticRollBack, released))
} else {
let released =
rollback_lock(txn, reader, primary_key, &lock, is_pessimistic_txn, true)?;
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.rollback.inc();
Ok((TxnStatus::TtlExpire, released))
};
}
if !lock.min_commit_ts.is_zero()
&& !caller_start_ts.is_max()
&& caller_start_ts >= lock.min_commit_ts
{
lock.min_commit_ts = caller_start_ts.next();
if lock.min_commit_ts < current_ts {
lock.min_commit_ts = current_ts;
}
txn.put_lock(primary_key, &lock);
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.update_ts.inc();
}
let min_commit_ts_pushed = (!caller_start_ts.is_zero() && lock.min_commit_ts > caller_start_ts)
|| caller_start_ts.is_max();
Ok((TxnStatus::uncommitted(lock, min_commit_ts_pushed), None))
}
pub fn check_txn_status_missing_lock(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
primary_key: Key,
mismatch_lock: Option<Lock>,
action: MissingLockAction,
resolving_pessimistic_lock: bool,
) -> Result<TxnStatus> {
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.get_commit_info.inc();
match reader.get_txn_commit_record(&primary_key)? {
TxnCommitRecord::SingleRecord { commit_ts, write } => {
if write.write_type == WriteType::Rollback {
Ok(TxnStatus::RolledBack)
} else {
Ok(TxnStatus::committed(commit_ts))
}
}
TxnCommitRecord::OverlappedRollback { .. } => Ok(TxnStatus::RolledBack),
TxnCommitRecord::None { overlapped_write } => {
if MissingLockAction::ReturnError == action {
return Err(ErrorInner::TxnNotFound {
start_ts: reader.start_ts,
key: primary_key.into_raw()?,
}
.into());
}
if resolving_pessimistic_lock {
return Ok(TxnStatus::LockNotExistDoNothing);
}
let ts = reader.start_ts;
if action.collapse_rollback() {
collapse_prev_rollback(txn, reader, &primary_key)?;
}
if let (Some(l), None) = (mismatch_lock, overlapped_write.as_ref()) {
txn.mark_rollback_on_mismatching_lock(
&primary_key,
l,
action == MissingLockAction::ProtectedRollback,
);
}
if let Some(write) = action.construct_write(ts, overlapped_write) {
txn.put_write(primary_key, ts, write.as_ref().to_bytes());
}
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.rollback.inc();
Ok(TxnStatus::LockNotExist)
}
}
}
pub fn rollback_lock(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
key: Key,
lock: &Lock,
is_pessimistic_txn: bool,
collapse_rollback: bool,
) -> Result<Option<ReleasedLock>> {
let overlapped_write = match reader.get_txn_commit_record(&key)? {
TxnCommitRecord::None { overlapped_write } => overlapped_write,
TxnCommitRecord::SingleRecord { write, .. } if write.write_type != WriteType::Rollback => {
panic!("txn record found but not expected: {:?}", txn)
}
_ => return Ok(txn.unlock_key(key, is_pessimistic_txn)),
};
if lock.short_value.is_none() && lock.lock_type == LockType::Put {
txn.delete_value(key.clone(), lock.ts);
}
let protected: bool = is_pessimistic_txn && key.is_encoded_from(&lock.primary);
if let Some(write) = make_rollback(reader.start_ts, protected, overlapped_write) {
txn.put_write(key.clone(), reader.start_ts, write.as_ref().to_bytes());
}
if collapse_rollback {
collapse_prev_rollback(txn, reader, &key)?;
}
Ok(txn.unlock_key(key, is_pessimistic_txn))
}
pub fn collapse_prev_rollback(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
key: &Key,
) -> Result<()> {
if let Some((commit_ts, write)) = reader.seek_write(key, reader.start_ts)? {
if write.write_type == WriteType::Rollback && !write.as_ref().is_protected() {
txn.delete_write(key.clone(), commit_ts);
}
}
Ok(())
}
pub fn make_rollback(
start_ts: TimeStamp,
protected: bool,
overlapped_write: Option<OverlappedWrite>,
) -> Option<Write> {
match overlapped_write {
Some(OverlappedWrite { write, gc_fence }) => {
assert!(start_ts > write.start_ts);
if protected {
Some(write.set_overlapped_rollback(true, Some(gc_fence)))
} else {
None
}
}
None => Some(Write::new_rollback(start_ts, protected)),
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum MissingLockAction {
Rollback,
ProtectedRollback,
ReturnError,
}
impl MissingLockAction {
pub fn rollback_protect(protect_rollback: bool) -> MissingLockAction {
if protect_rollback {
MissingLockAction::ProtectedRollback
} else {
MissingLockAction::Rollback
}
}
pub fn rollback(rollback_if_not_exist: bool) -> MissingLockAction {
if rollback_if_not_exist {
MissingLockAction::ProtectedRollback
} else {
MissingLockAction::ReturnError
}
}
fn collapse_rollback(&self) -> bool {
match self {
MissingLockAction::Rollback => true,
MissingLockAction::ProtectedRollback => false,
_ => unreachable!(),
}
}
pub fn construct_write(
&self,
ts: TimeStamp,
overlapped_write: Option<OverlappedWrite>,
) -> Option<Write> {
make_rollback(ts, !self.collapse_rollback(), overlapped_write)
}
}