1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use crate::storage::kv::WriteData;
use crate::storage::lock_manager::LockManager;
use crate::storage::mvcc::{MvccTxn, SnapshotReader, MAX_TXN_WRITE_SIZE};
use crate::storage::txn::commands::{
Command, CommandExt, ReleasedLocks, ResolveLockReadPhase, ResponsePolicy, TypedCommand,
WriteCommand, WriteContext, WriteResult,
};
use crate::storage::txn::{cleanup, commit, Error, ErrorInner, Result};
use crate::storage::{ProcessResult, Snapshot};
use collections::HashMap;
use txn_types::{Key, Lock, TimeStamp};
command! {
ResolveLock:
cmd_ty => (),
display => "kv::resolve_lock", (),
content => {
txn_status: HashMap<TimeStamp, TimeStamp>,
scan_key: Option<Key>,
key_locks: Vec<(Key, Lock)>,
}
}
impl CommandExt for ResolveLock {
ctx!();
tag!(resolve_lock);
command_method!(readonly, bool, false);
command_method!(is_sys_cmd, bool, true);
fn write_bytes(&self) -> usize {
self.key_locks
.iter()
.map(|(key, _)| key.as_encoded().len())
.sum()
}
gen_lock!(key_locks: multiple(|(key, _)| key));
}
impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for ResolveLock {
fn process_write(mut self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
let (ctx, txn_status, key_locks) = (self.ctx, self.txn_status, self.key_locks);
let mut txn = MvccTxn::new(TimeStamp::zero(), context.concurrency_manager);
let mut reader =
SnapshotReader::new(TimeStamp::zero(), snapshot, !ctx.get_not_fill_cache());
let mut scan_key = self.scan_key.take();
let rows = key_locks.len();
let mut released_locks = HashMap::default();
for (current_key, current_lock) in key_locks {
txn.start_ts = current_lock.ts;
reader.start_ts = current_lock.ts;
let commit_ts = *txn_status
.get(¤t_lock.ts)
.expect("txn status not found");
let released = if commit_ts.is_zero() {
cleanup(
&mut txn,
&mut reader,
current_key.clone(),
TimeStamp::zero(),
false,
)?
} else if commit_ts > current_lock.ts {
commit(&mut txn, &mut reader, current_key.clone(), commit_ts)?
} else {
return Err(Error::from(ErrorInner::InvalidTxnTso {
start_ts: current_lock.ts,
commit_ts,
}));
};
released_locks
.entry(current_lock.ts)
.or_insert_with(|| ReleasedLocks::new(current_lock.ts, commit_ts))
.push(released);
if txn.write_size() >= MAX_TXN_WRITE_SIZE {
scan_key = Some(current_key);
break;
}
}
let lock_mgr = context.lock_mgr;
released_locks
.into_iter()
.for_each(|(_, released_locks)| released_locks.wake_up(lock_mgr));
context.statistics.add(&reader.take_statistics());
let pr = if scan_key.is_none() {
ProcessResult::Res
} else {
ProcessResult::NextCommand {
cmd: ResolveLockReadPhase::new(txn_status, scan_key.take(), ctx.clone()).into(),
}
};
let write_data = WriteData::from_modifies(txn.into_modifies());
Ok(WriteResult {
ctx,
to_be_write: write_data,
rows,
pr,
lock_info: None,
lock_guards: vec![],
response_policy: ResponsePolicy::OnApplied,
})
}
}
pub const RESOLVE_LOCK_BATCH_SIZE: usize = 256;