use kvproto::kvrpcpb::{ExtraOp, LockInfo};
use txn_types::{Key, OldValues, TimeStamp, TxnExtra};
use crate::storage::kv::WriteData;
use crate::storage::lock_manager::{LockManager, WaitTimeout};
use crate::storage::mvcc::{
Error as MvccError, ErrorInner as MvccErrorInner, MvccTxn, SnapshotReader,
};
use crate::storage::txn::commands::{
Command, CommandExt, ResponsePolicy, TypedCommand, WriteCommand, WriteContext, WriteResult,
WriteResultLockInfo,
};
use crate::storage::txn::{acquire_pessimistic_lock, Error, ErrorInner, Result};
use crate::storage::{
Error as StorageError, ErrorInner as StorageErrorInner, PessimisticLockRes, ProcessResult,
Result as StorageResult, Snapshot,
};
command! {
AcquirePessimisticLock:
cmd_ty => StorageResult<PessimisticLockRes>,
display => "kv::command::acquirepessimisticlock keys({}) @ {} {} | {:?}", (keys.len, start_ts, for_update_ts, ctx),
content => {
keys: Vec<(Key, bool)>,
primary: Vec<u8>,
start_ts: TimeStamp,
lock_ttl: u64,
is_first_lock: bool,
for_update_ts: TimeStamp,
wait_timeout: Option<WaitTimeout>,
return_values: bool,
min_commit_ts: TimeStamp,
old_values: OldValues,
}
}
impl CommandExt for AcquirePessimisticLock {
ctx!();
tag!(acquire_pessimistic_lock);
ts!(start_ts);
command_method!(can_be_pipelined, bool, true);
fn write_bytes(&self) -> usize {
self.keys
.iter()
.map(|(key, _)| key.as_encoded().len())
.sum()
}
gen_lock!(keys: multiple(|x| &x.0));
}
fn extract_lock_info_from_result<T>(res: &StorageResult<T>) -> &LockInfo {
match res {
Err(StorageError(box StorageErrorInner::Txn(Error(box ErrorInner::Mvcc(MvccError(
box MvccErrorInner::KeyIsLocked(info),
)))))) => info,
_ => panic!("unexpected mvcc error"),
}
}
impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for AcquirePessimisticLock {
fn process_write(mut self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
let (start_ts, ctx, keys) = (self.start_ts, self.ctx, self.keys);
let mut txn = MvccTxn::new(start_ts, context.concurrency_manager);
let mut reader = SnapshotReader::new(start_ts, snapshot, !ctx.get_not_fill_cache());
let rows = keys.len();
let mut res = if self.return_values {
Ok(PessimisticLockRes::Values(vec![]))
} else {
Ok(PessimisticLockRes::Empty)
};
let need_old_value = context.extra_op == ExtraOp::ReadOldValue;
for (k, should_not_exist) in keys {
match acquire_pessimistic_lock(
&mut txn,
&mut reader,
k.clone(),
&self.primary,
should_not_exist,
self.lock_ttl,
self.for_update_ts,
self.return_values,
self.min_commit_ts,
need_old_value,
) {
Ok((val, old_value)) => {
if self.return_values {
res.as_mut().unwrap().push(val);
}
if old_value.valid() {
let key = k.append_ts(txn.start_ts);
let mutation_type = None;
self.old_values.insert(key, (old_value, mutation_type));
}
}
Err(e @ MvccError(box MvccErrorInner::KeyIsLocked { .. })) => {
res = Err(e).map_err(Error::from).map_err(StorageError::from);
break;
}
Err(e) => return Err(Error::from(e)),
}
}
if let Ok(PessimisticLockRes::Values(values)) = &res {
if !values.is_empty() {
txn.concurrency_manager.update_max_ts(self.for_update_ts);
}
}
context.statistics.add(&reader.take_statistics());
let (pr, to_be_write, rows, ctx, lock_info) = if res.is_ok() {
let pr = ProcessResult::PessimisticLockRes { res };
let extra = TxnExtra {
old_values: self.old_values,
one_pc: false,
};
let write_data = WriteData::new(txn.into_modifies(), extra);
(pr, write_data, rows, ctx, None)
} else {
let lock_info_pb = extract_lock_info_from_result(&res);
let lock_info = WriteResultLockInfo::from_lock_info_pb(
lock_info_pb,
self.is_first_lock,
self.wait_timeout,
);
let pr = ProcessResult::PessimisticLockRes { res };
(pr, WriteData::default(), 0, ctx, Some(lock_info))
};
Ok(WriteResult {
ctx,
to_be_write,
rows,
pr,
lock_info,
lock_guards: vec![],
response_policy: ResponsePolicy::OnProposed,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_gen_lock_info_from_result() {
let raw_key = b"key".to_vec();
let key = Key::from_raw(&raw_key);
let ts = 100;
let is_first_lock = true;
let wait_timeout = WaitTimeout::from_encoded(200);
let mut info = LockInfo::default();
info.set_key(raw_key.clone());
info.set_lock_version(ts);
info.set_lock_ttl(100);
let case = StorageError::from(StorageErrorInner::Txn(Error::from(ErrorInner::Mvcc(
MvccError::from(MvccErrorInner::KeyIsLocked(info)),
))));
let lock_info = WriteResultLockInfo::from_lock_info_pb(
extract_lock_info_from_result::<()>(&Err(case)),
is_first_lock,
wait_timeout,
);
assert_eq!(lock_info.lock.ts, ts.into());
assert_eq!(lock_info.lock.hash, key.gen_hash());
assert_eq!(lock_info.key, raw_key);
assert_eq!(lock_info.is_first_lock, is_first_lock);
assert_eq!(lock_info.wait_timeout, wait_timeout);
}
}