use collections::{HashMap, HashSet};
use raftstore::store::RegionReadProgress;
use std::cmp;
use std::collections::BTreeMap;
use std::sync::Arc;
use txn_types::TimeStamp;
pub struct Resolver {
region_id: u64,
locks_by_key: HashMap<Arc<[u8]>, TimeStamp>,
lock_ts_heap: BTreeMap<TimeStamp, HashSet<Arc<[u8]>>>,
resolved_ts: TimeStamp,
tracked_index: u64,
read_progress: Option<Arc<RegionReadProgress>>,
min_ts: TimeStamp,
}
impl Resolver {
pub fn new(region_id: u64) -> Resolver {
Resolver::with_read_progress(region_id, None)
}
pub fn with_read_progress(
region_id: u64,
read_progress: Option<Arc<RegionReadProgress>>,
) -> Resolver {
Resolver {
region_id,
resolved_ts: TimeStamp::zero(),
locks_by_key: HashMap::default(),
lock_ts_heap: BTreeMap::new(),
read_progress,
tracked_index: 0,
min_ts: TimeStamp::zero(),
}
}
pub fn resolved_ts(&self) -> TimeStamp {
self.resolved_ts
}
pub fn locks(&self) -> &BTreeMap<TimeStamp, HashSet<Arc<[u8]>>> {
&self.lock_ts_heap
}
pub fn update_tracked_index(&mut self, index: u64) {
assert!(
self.tracked_index <= index,
"region {}, tracked_index: {}, incoming index: {}",
self.region_id,
self.tracked_index,
index
);
self.tracked_index = index;
}
pub fn track_lock(&mut self, start_ts: TimeStamp, key: Vec<u8>, index: Option<u64>) {
if let Some(index) = index {
self.update_tracked_index(index);
}
debug!(
"track lock {}@{}, region {}",
&log_wrappers::Value::key(&key),
start_ts,
self.region_id
);
let key: Arc<[u8]> = key.into_boxed_slice().into();
self.locks_by_key.insert(key.clone(), start_ts);
self.lock_ts_heap.entry(start_ts).or_default().insert(key);
}
pub fn untrack_lock(&mut self, key: &[u8], index: Option<u64>) {
if let Some(index) = index {
self.update_tracked_index(index);
}
let start_ts = if let Some(start_ts) = self.locks_by_key.remove(key) {
start_ts
} else {
debug!("untrack a lock that was not tracked before"; "key" => &log_wrappers::Value::key(key));
return;
};
debug!(
"untrack lock {}@{}, region {}",
&log_wrappers::Value::key(key),
start_ts,
self.region_id,
);
let entry = self.lock_ts_heap.get_mut(&start_ts);
if let Some(locked_keys) = entry {
locked_keys.remove(key);
if locked_keys.is_empty() {
self.lock_ts_heap.remove(&start_ts);
}
}
}
pub fn resolve(&mut self, min_ts: TimeStamp) -> TimeStamp {
let min_lock = self.lock_ts_heap.keys().next().cloned();
let has_lock = min_lock.is_some();
let min_start_ts = min_lock.unwrap_or(min_ts);
let new_resolved_ts = cmp::min(min_start_ts, min_ts);
self.resolved_ts = cmp::max(self.resolved_ts, new_resolved_ts);
if let Some(rrp) = &self.read_progress {
rrp.update_safe_ts(self.tracked_index, self.resolved_ts.into_inner());
}
let new_min_ts = if has_lock {
new_resolved_ts
} else {
min_ts
};
self.min_ts = cmp::max(self.min_ts, new_min_ts);
self.resolved_ts
}
}
#[cfg(test)]
mod tests {
use super::*;
use txn_types::Key;
#[derive(Clone)]
enum Event {
Lock(u64, Key),
Unlock(Key),
Resolve(u64, u64),
}
#[test]
fn test_resolve() {
let cases = vec![
vec![Event::Lock(1, Key::from_raw(b"a")), Event::Resolve(2, 1)],
vec![
Event::Lock(1, Key::from_raw(b"a")),
Event::Unlock(Key::from_raw(b"a")),
Event::Resolve(2, 2),
],
vec![
Event::Lock(3, Key::from_raw(b"a")),
Event::Unlock(Key::from_raw(b"a")),
Event::Resolve(2, 2),
],
vec![
Event::Lock(1, Key::from_raw(b"a")),
Event::Unlock(Key::from_raw(b"a")),
Event::Lock(1, Key::from_raw(b"b")),
Event::Resolve(2, 1),
],
vec![
Event::Lock(2, Key::from_raw(b"a")),
Event::Unlock(Key::from_raw(b"a")),
Event::Resolve(2, 2),
Event::Lock(1, Key::from_raw(b"a")),
Event::Resolve(2, 2),
Event::Unlock(Key::from_raw(b"a")),
Event::Resolve(3, 3),
],
vec![
Event::Unlock(Key::from_raw(b"a")),
Event::Lock(2, Key::from_raw(b"a")),
Event::Unlock(Key::from_raw(b"a")),
Event::Unlock(Key::from_raw(b"a")),
Event::Resolve(3, 3),
],
vec![
Event::Lock(2, Key::from_raw(b"a")),
Event::Resolve(4, 2),
Event::Unlock(Key::from_raw(b"a")),
Event::Resolve(5, 5),
],
vec![
Event::Lock(1, Key::from_raw(b"a")),
Event::Unlock(Key::from_raw(b"b")),
Event::Unlock(Key::from_raw(b"a")),
],
];
for (i, case) in cases.into_iter().enumerate() {
let mut resolver = Resolver::new(1);
for e in case.clone() {
match e {
Event::Lock(start_ts, key) => {
resolver.track_lock(start_ts.into(), key.into_raw().unwrap(), None)
}
Event::Unlock(key) => resolver.untrack_lock(&key.into_raw().unwrap(), None),
Event::Resolve(min_ts, expect) => {
assert_eq!(resolver.resolve(min_ts.into()), expect.into(), "case {}", i)
}
}
}
}
}
}