use super::client::{self, Client};
use super::config::Config;
use super::metrics::*;
use super::waiter_manager::Scheduler as WaiterMgrScheduler;
use super::{Error, Result};
use crate::server::resolve::StoreAddrResolver;
use crate::storage::lock_manager::{DiagnosticContext, Lock};
use collections::HashMap;
use engine_traits::KvEngine;
use futures::future::{self, FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::TryStreamExt;
use grpcio::{
self, DuplexSink, Environment, RequestStream, RpcContext, RpcStatus, RpcStatusCode, UnarySink,
WriteFlags,
};
use kvproto::deadlock::*;
use kvproto::metapb::Region;
use pd_client::{PdClient, INVALID_ID};
use raft::StateRole;
use raftstore::coprocessor::{
BoxRegionChangeObserver, BoxRoleObserver, Coprocessor, CoprocessorHost, ObserverContext,
RegionChangeEvent, RegionChangeObserver, RoleObserver,
};
use raftstore::store::util::is_region_initialized;
use security::SecurityManager;
use std::cell::RefCell;
use std::fmt::{self, Display, Formatter};
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use tikv_util::future::paired_future_callback;
use tikv_util::time::{Duration, Instant};
use tikv_util::worker::{FutureRunnable, FutureScheduler, Stopped};
use tokio::task::spawn_local;
use txn_types::TimeStamp;
struct Locks {
ts: TimeStamp,
keys: Vec<(u64, Vec<u8>)>,
resource_group_tag: Vec<u8>,
last_detect_time: Instant,
}
impl Locks {
fn new(
ts: TimeStamp,
hash: u64,
key: Vec<u8>,
resource_group_tag: Vec<u8>,
last_detect_time: Instant,
) -> Self {
Self {
ts,
keys: vec![(hash, key)],
resource_group_tag,
last_detect_time,
}
}
fn push(&mut self, lock_hash: u64, key: Vec<u8>, now: Instant) {
if !self.keys.iter().any(|(hash, _)| *hash == lock_hash) {
self.keys.push((lock_hash, key))
}
self.last_detect_time = now
}
fn remove(&mut self, lock_hash: u64) -> bool {
if let Some(idx) = self.keys.iter().position(|(hash, _)| *hash == lock_hash) {
self.keys.remove(idx);
}
self.keys.is_empty()
}
fn is_expired(&self, now: Instant, ttl: Duration) -> bool {
now.duration_since(self.last_detect_time) >= ttl
}
fn to_wait_for_entry(&self, waiter_ts: TimeStamp) -> WaitForEntry {
let mut entry = WaitForEntry::default();
entry.set_txn(waiter_ts.into_inner());
entry.set_wait_for_txn(self.ts.into_inner());
entry.set_key_hash(self.keys[0].0);
entry.set_key(self.keys[0].1.clone());
entry.set_resource_group_tag(self.resource_group_tag.clone());
entry
}
}
pub struct DetectTable {
wait_for_map: HashMap<TimeStamp, HashMap<TimeStamp, Locks>>,
ttl: Duration,
last_active_expire: Instant,
now: Instant,
}
impl DetectTable {
pub fn new(ttl: Duration) -> Self {
Self {
wait_for_map: HashMap::default(),
ttl,
last_active_expire: Instant::now_coarse(),
now: Instant::now_coarse(),
}
}
pub fn detect(
&mut self,
txn_ts: TimeStamp,
lock_ts: TimeStamp,
lock_hash: u64,
lock_key: &[u8],
resource_group_tag: &[u8],
) -> Option<(u64, Vec<WaitForEntry>)> {
let _timer = DETECT_DURATION_HISTOGRAM.start_coarse_timer();
TASK_COUNTER_METRICS.detect.inc();
self.now = Instant::now_coarse();
self.active_expire();
if self.register_if_existed(txn_ts, lock_ts, lock_hash, lock_key, resource_group_tag) {
return None;
}
if let Some((deadlock_key_hash, wait_chain)) = self.do_detect(txn_ts, lock_ts) {
ERROR_COUNTER_METRICS.deadlock.inc();
return Some((deadlock_key_hash, wait_chain));
}
self.register(txn_ts, lock_ts, lock_hash, lock_key, resource_group_tag);
None
}
fn do_detect(
&mut self,
txn_ts: TimeStamp,
wait_for_ts: TimeStamp,
) -> Option<(u64, Vec<WaitForEntry>)> {
let now = self.now;
let ttl = self.ttl;
let mut stack = vec![wait_for_ts];
let mut pushed: HashMap<TimeStamp, TimeStamp> = HashMap::default();
pushed.insert(wait_for_ts, TimeStamp::zero());
while let Some(curr_ts) = stack.pop() {
if let Some(wait_for) = self.wait_for_map.get_mut(&curr_ts) {
wait_for.retain(|_, locks| !locks.is_expired(now, ttl));
if wait_for.is_empty() {
self.wait_for_map.remove(&curr_ts);
} else {
for (lock_ts, locks) in wait_for {
let lock_ts = *lock_ts;
if lock_ts == txn_ts {
let hash = locks.keys[0].0;
let last_entry = locks.to_wait_for_entry(curr_ts);
let mut wait_chain =
self.generate_wait_chain(wait_for_ts, curr_ts, pushed);
wait_chain.push(last_entry);
return Some((hash, wait_chain));
}
#[allow(clippy::map_entry)]
if !pushed.contains_key(&lock_ts) {
stack.push(lock_ts);
pushed.insert(lock_ts, curr_ts);
}
}
}
}
}
None
}
fn generate_wait_chain(
&self,
start: TimeStamp,
end: TimeStamp,
vertex_predecessors_map: HashMap<TimeStamp, TimeStamp>,
) -> Vec<WaitForEntry> {
let mut wait_chain = Vec::with_capacity(3);
let mut lock_ts = end;
loop {
let waiter_ts = *vertex_predecessors_map.get(&lock_ts).unwrap();
if waiter_ts.is_zero() {
assert_eq!(lock_ts, start);
break;
}
let locks = self
.wait_for_map
.get(&waiter_ts)
.unwrap()
.get(&lock_ts)
.unwrap();
let entry = locks.to_wait_for_entry(waiter_ts);
wait_chain.push(entry);
lock_ts = waiter_ts;
}
wait_chain.reverse();
wait_chain
}
fn register_if_existed(
&mut self,
txn_ts: TimeStamp,
lock_ts: TimeStamp,
lock_hash: u64,
key: &[u8],
resource_group_tag: &[u8],
) -> bool {
if let Some(wait_for) = self.wait_for_map.get_mut(&txn_ts) {
if let Some(locks) = wait_for.get_mut(&lock_ts) {
locks.push(lock_hash, key.to_vec(), self.now);
locks.resource_group_tag = resource_group_tag.to_vec();
return true;
}
}
false
}
fn register(
&mut self,
txn_ts: TimeStamp,
lock_ts: TimeStamp,
lock_hash: u64,
key: &[u8],
resource_group_tag: &[u8],
) {
let wait_for = self.wait_for_map.entry(txn_ts).or_default();
assert!(!wait_for.contains_key(&lock_ts));
let locks = Locks::new(
lock_ts,
lock_hash,
key.to_vec(),
resource_group_tag.to_vec(),
self.now,
);
wait_for.insert(locks.ts, locks);
}
fn clean_up_wait_for(&mut self, txn_ts: TimeStamp, lock_ts: TimeStamp, lock_hash: u64) {
if let Some(wait_for) = self.wait_for_map.get_mut(&txn_ts) {
if let Some(locks) = wait_for.get_mut(&lock_ts) {
if locks.remove(lock_hash) {
wait_for.remove(&lock_ts);
if wait_for.is_empty() {
self.wait_for_map.remove(&txn_ts);
}
}
}
}
TASK_COUNTER_METRICS.clean_up_wait_for.inc();
}
fn clean_up(&mut self, txn_ts: TimeStamp) {
self.wait_for_map.remove(&txn_ts);
TASK_COUNTER_METRICS.clean_up.inc();
}
fn clear(&mut self) {
self.wait_for_map.clear();
}
fn reset_ttl(&mut self, ttl: Duration) {
self.ttl = ttl;
}
const ACTIVE_EXPIRE_THRESHOLD: usize = 100000;
const ACTIVE_EXPIRE_INTERVAL: Duration = Duration::from_secs(3600);
fn active_expire(&mut self) {
if self.wait_for_map.len() >= Self::ACTIVE_EXPIRE_THRESHOLD
&& self.now.duration_since(self.last_active_expire) >= Self::ACTIVE_EXPIRE_INTERVAL
{
let now = self.now;
let ttl = self.ttl;
for (_, wait_for) in self.wait_for_map.iter_mut() {
wait_for.retain(|_, locks| !locks.is_expired(now, ttl));
}
self.wait_for_map.retain(|_, wait_for| !wait_for.is_empty());
self.last_active_expire = self.now;
}
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum Role {
Leader,
Follower,
}
impl Default for Role {
fn default() -> Role {
Role::Follower
}
}
impl From<StateRole> for Role {
fn from(role: StateRole) -> Role {
match role {
StateRole::Leader => Role::Leader,
_ => Role::Follower,
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum DetectType {
Detect,
CleanUpWaitFor,
CleanUp,
}
pub enum Task {
Detect {
tp: DetectType,
txn_ts: TimeStamp,
lock: Lock,
diag_ctx: DiagnosticContext,
},
DetectRpc {
stream: RequestStream<DeadlockRequest>,
sink: DuplexSink<DeadlockResponse>,
},
ChangeRole(Role),
ChangeTTL(Duration),
#[cfg(any(test, feature = "testexport"))]
Validate(Box<dyn FnOnce(u64) + Send>),
#[cfg(test)]
GetRole(Box<dyn FnOnce(Role) + Send>),
}
impl Display for Task {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Task::Detect {
tp, txn_ts, lock, ..
} => write!(
f,
"Detect {{ tp: {:?}, txn_ts: {}, lock: {:?} }}",
tp, txn_ts, lock
),
Task::DetectRpc { .. } => write!(f, "Detect Rpc"),
Task::ChangeRole(role) => write!(f, "ChangeRole {{ role: {:?} }}", role),
Task::ChangeTTL(ttl) => write!(f, "ChangeTTL {{ ttl: {:?} }}", ttl),
#[cfg(any(test, feature = "testexport"))]
Task::Validate(_) => write!(f, "Validate dead lock config"),
#[cfg(test)]
Task::GetRole(_) => write!(f, "Get role of the deadlock detector"),
}
}
}
#[derive(Clone)]
pub struct Scheduler(FutureScheduler<Task>);
impl Scheduler {
pub fn new(scheduler: FutureScheduler<Task>) -> Self {
Self(scheduler)
}
fn notify_scheduler(&self, task: Task) {
if let Err(Stopped(task)) = self.0.schedule(task) {
error!("failed to send task to deadlock_detector"; "task" => %task);
}
}
pub fn detect(&self, txn_ts: TimeStamp, lock: Lock, diag_ctx: DiagnosticContext) {
self.notify_scheduler(Task::Detect {
tp: DetectType::Detect,
txn_ts,
lock,
diag_ctx,
});
}
pub fn clean_up_wait_for(&self, txn_ts: TimeStamp, lock: Lock) {
self.notify_scheduler(Task::Detect {
tp: DetectType::CleanUpWaitFor,
txn_ts,
lock,
diag_ctx: DiagnosticContext::default(),
});
}
pub fn clean_up(&self, txn_ts: TimeStamp) {
self.notify_scheduler(Task::Detect {
tp: DetectType::CleanUp,
txn_ts,
lock: Lock::default(),
diag_ctx: DiagnosticContext::default(),
});
}
fn change_role(&self, role: Role) {
self.notify_scheduler(Task::ChangeRole(role));
}
pub fn change_ttl(&self, t: Duration) {
self.notify_scheduler(Task::ChangeTTL(t));
}
#[cfg(any(test, feature = "testexport"))]
pub fn validate(&self, f: Box<dyn FnOnce(u64) + Send>) {
self.notify_scheduler(Task::Validate(f));
}
#[cfg(test)]
pub fn get_role(&self, f: Box<dyn FnOnce(Role) + Send>) {
self.notify_scheduler(Task::GetRole(f));
}
}
const LEADER_KEY: &[u8] = b"";
#[derive(Clone)]
pub(crate) struct RoleChangeNotifier {
leader_region_id: Arc<Mutex<u64>>,
scheduler: Scheduler,
}
impl RoleChangeNotifier {
fn is_leader_region(region: &Region) -> bool {
is_region_initialized(region)
&& region.get_start_key() <= LEADER_KEY
&& (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key())
}
pub(crate) fn new(scheduler: Scheduler) -> Self {
Self {
leader_region_id: Arc::new(Mutex::new(INVALID_ID)),
scheduler,
}
}
pub(crate) fn register(self, host: &mut CoprocessorHost<impl KvEngine>) {
host.registry
.register_role_observer(1, BoxRoleObserver::new(self.clone()));
host.registry
.register_region_change_observer(1, BoxRegionChangeObserver::new(self));
}
}
impl Coprocessor for RoleChangeNotifier {}
impl RoleObserver for RoleChangeNotifier {
fn on_role_change(&self, ctx: &mut ObserverContext<'_>, role: StateRole) {
let region = ctx.region();
if Self::is_leader_region(region)
&& *self.leader_region_id.lock().unwrap() == region.get_id()
{
self.scheduler.change_role(role.into());
}
}
}
impl RegionChangeObserver for RoleChangeNotifier {
fn on_region_changed(
&self,
ctx: &mut ObserverContext<'_>,
event: RegionChangeEvent,
role: StateRole,
) {
let region = ctx.region();
if Self::is_leader_region(region) {
match event {
RegionChangeEvent::Create | RegionChangeEvent::Update => {
*self.leader_region_id.lock().unwrap() = region.get_id();
self.scheduler.change_role(role.into());
}
RegionChangeEvent::Destroy => {
let mut leader_region_id = self.leader_region_id.lock().unwrap();
if *leader_region_id == region.get_id() {
*leader_region_id = INVALID_ID;
self.scheduler.change_role(Role::Follower);
}
}
}
}
}
}
struct Inner {
role: Role,
detect_table: DetectTable,
}
pub struct Detector<S, P>
where
S: StoreAddrResolver + 'static,
P: PdClient + 'static,
{
store_id: u64,
env: Arc<Environment>,
leader_info: Option<(u64, String)>,
leader_client: Option<Client>,
pd_client: Arc<P>,
resolver: S,
security_mgr: Arc<SecurityManager>,
waiter_mgr_scheduler: WaiterMgrScheduler,
inner: Rc<RefCell<Inner>>,
}
unsafe impl<S, P> Send for Detector<S, P>
where
S: StoreAddrResolver + 'static,
P: PdClient + 'static,
{
}
impl<S, P> Detector<S, P>
where
S: StoreAddrResolver + 'static,
P: PdClient + 'static,
{
pub fn new(
store_id: u64,
pd_client: Arc<P>,
resolver: S,
security_mgr: Arc<SecurityManager>,
waiter_mgr_scheduler: WaiterMgrScheduler,
cfg: &Config,
) -> Self {
assert!(store_id != INVALID_ID);
Self {
store_id,
env: client::env(),
leader_info: None,
leader_client: None,
pd_client,
resolver,
security_mgr,
waiter_mgr_scheduler,
inner: Rc::new(RefCell::new(Inner {
role: Role::Follower,
detect_table: DetectTable::new(cfg.wait_for_lock_timeout.into()),
})),
}
}
fn is_leader(&self) -> bool {
self.inner.borrow().role == Role::Leader
}
fn reset(&mut self, role: Role) {
let mut inner = self.inner.borrow_mut();
inner.detect_table.clear();
inner.role = role;
self.leader_client.take();
self.leader_info.take();
}
fn refresh_leader_info(&mut self) -> bool {
assert!(!self.is_leader());
match self.get_leader_info() {
Ok(Some((leader_id, leader_addr))) => {
self.update_leader_info(leader_id, leader_addr);
}
Ok(None) => {
info!("no leader");
self.reset(Role::Follower);
}
Err(e) => {
error!("get leader info failed"; "err" => ?e);
}
};
self.leader_info.is_some()
}
fn get_leader_info(&self) -> Result<Option<(u64, String)>> {
let leader = self.pd_client.get_region_info(LEADER_KEY)?.leader;
match leader {
Some(leader) => {
let leader_id = leader.get_store_id();
let leader_addr = self.resolve_store_address(leader_id)?;
Ok(Some((leader_id, leader_addr)))
}
None => {
ERROR_COUNTER_METRICS.leader_not_found.inc();
Ok(None)
}
}
}
fn resolve_store_address(&self, store_id: u64) -> Result<String> {
match wait_op!(|cb| self
.resolver
.resolve(store_id, cb)
.map_err(|e| Error::Other(box_err!(e))))
{
Some(Ok(addr)) => Ok(addr),
_ => Err(box_err!("failed to resolve store address")),
}
}
fn update_leader_info(&mut self, leader_id: u64, leader_addr: String) {
match self.leader_info {
Some((id, ref addr)) if id == leader_id && *addr == leader_addr => {
debug!("leader not change"; "leader_id" => leader_id, "leader_addr" => %leader_addr);
}
_ => {
if leader_id == self.store_id {
info!("stale leader info");
} else {
info!("leader changed"; "leader_id" => leader_id, "leader_addr" => %leader_addr);
self.leader_client.take();
self.leader_info.replace((leader_id, leader_addr));
}
}
}
}
fn change_role(&mut self, role: Role) {
if self.inner.borrow().role != role {
match role {
Role::Leader => {
info!("became the leader of deadlock detector!"; "self_id" => self.store_id);
DETECTOR_LEADER_GAUGE.set(1);
}
Role::Follower => {
info!("changed from the leader of deadlock detector to follower!"; "self_id" => self.store_id);
DETECTOR_LEADER_GAUGE.set(0);
}
}
}
self.reset(role);
}
fn reconnect_leader(&mut self) {
assert!(self.leader_client.is_none() && self.leader_info.is_some());
ERROR_COUNTER_METRICS.reconnect_leader.inc();
let (leader_id, leader_addr) = self.leader_info.as_ref().unwrap();
let mut leader_client = Client::new(
Arc::clone(&self.env),
Arc::clone(&self.security_mgr),
leader_addr,
);
let waiter_mgr_scheduler = self.waiter_mgr_scheduler.clone();
let (send, recv) = leader_client.register_detect_handler(Box::new(move |mut resp| {
let entry = resp.take_entry();
let txn = entry.txn.into();
let lock = Lock {
ts: entry.wait_for_txn.into(),
hash: entry.key_hash,
};
let mut wait_chain: Vec<_> = resp.take_wait_chain().into();
wait_chain.push(entry);
waiter_mgr_scheduler.deadlock(txn, lock, resp.get_deadlock_key_hash(), wait_chain)
}));
spawn_local(send.map_err(|e| error!("leader client failed"; "err" => ?e)));
spawn_local(recv.map_err(|_| ()));
self.leader_client = Some(leader_client);
info!("reconnect leader succeeded"; "leader_id" => leader_id);
}
fn send_request_to_leader(
&mut self,
tp: DetectType,
txn_ts: TimeStamp,
lock: Lock,
diag_ctx: DiagnosticContext,
) -> bool {
assert!(!self.is_leader() && self.leader_info.is_some());
if self.leader_client.is_none() {
self.reconnect_leader();
}
if let Some(leader_client) = &self.leader_client {
let tp = match tp {
DetectType::Detect => DeadlockRequestType::Detect,
DetectType::CleanUpWaitFor => DeadlockRequestType::CleanUpWaitFor,
DetectType::CleanUp => DeadlockRequestType::CleanUp,
};
let mut entry = WaitForEntry::default();
entry.set_txn(txn_ts.into_inner());
entry.set_wait_for_txn(lock.ts.into_inner());
entry.set_key_hash(lock.hash);
entry.set_key(diag_ctx.key);
entry.set_resource_group_tag(diag_ctx.resource_group_tag);
let mut req = DeadlockRequest::default();
req.set_tp(tp);
req.set_entry(entry);
if leader_client.detect(req).is_ok() {
return true;
}
self.leader_client.take();
}
false
}
fn handle_detect_locally(
&self,
tp: DetectType,
txn_ts: TimeStamp,
lock: Lock,
diag_ctx: DiagnosticContext,
) {
let detect_table = &mut self.inner.borrow_mut().detect_table;
match tp {
DetectType::Detect => {
if let Some((deadlock_key_hash, mut wait_chain)) = detect_table.detect(
txn_ts,
lock.ts,
lock.hash,
&diag_ctx.key,
&diag_ctx.resource_group_tag,
) {
let mut last_entry = WaitForEntry::default();
last_entry.set_txn(txn_ts.into_inner());
last_entry.set_wait_for_txn(lock.ts.into_inner());
last_entry.set_key_hash(lock.hash);
last_entry.set_key(diag_ctx.key);
last_entry.set_resource_group_tag(diag_ctx.resource_group_tag);
wait_chain.push(last_entry);
self.waiter_mgr_scheduler
.deadlock(txn_ts, lock, deadlock_key_hash, wait_chain);
}
}
DetectType::CleanUpWaitFor => {
detect_table.clean_up_wait_for(txn_ts, lock.ts, lock.hash)
}
DetectType::CleanUp => detect_table.clean_up(txn_ts),
}
}
fn handle_detect(
&mut self,
tp: DetectType,
txn_ts: TimeStamp,
lock: Lock,
diag_ctx: DiagnosticContext,
) {
if self.is_leader() {
self.handle_detect_locally(tp, txn_ts, lock, diag_ctx);
} else {
for _ in 0..2 {
if self.leader_client.is_none() && !self.refresh_leader_info() {
break;
}
if self.send_request_to_leader(tp, txn_ts, lock, diag_ctx.clone()) {
return;
}
}
warn!("detect request dropped"; "tp" => ?tp, "txn_ts" => txn_ts, "lock" => ?lock);
ERROR_COUNTER_METRICS.dropped.inc();
}
}
fn handle_detect_rpc(
&self,
stream: RequestStream<DeadlockRequest>,
sink: DuplexSink<DeadlockResponse>,
) {
if !self.is_leader() {
let status = RpcStatus::new(
RpcStatusCode::FAILED_PRECONDITION,
Some("I'm not the leader of deadlock detector".to_string()),
);
spawn_local(sink.fail(status).map_err(|_| ()));
ERROR_COUNTER_METRICS.not_leader.inc();
return;
}
let inner = Rc::clone(&self.inner);
let mut s = stream.map_err(Error::Grpc).try_filter_map(move |mut req| {
let mut inner = inner.borrow_mut();
if inner.role != Role::Leader {
ERROR_COUNTER_METRICS.not_leader.inc();
return future::ready(Err(Error::Other(box_err!("leader changed"))));
}
let WaitForEntry {
txn,
wait_for_txn,
key_hash,
key,
resource_group_tag,
..
} = req.get_entry();
let detect_table = &mut inner.detect_table;
let res = match req.get_tp() {
DeadlockRequestType::Detect => {
if let Some((deadlock_key_hash, wait_chain)) = detect_table.detect(
txn.into(),
wait_for_txn.into(),
*key_hash,
key,
resource_group_tag,
) {
let mut resp = DeadlockResponse::default();
resp.set_entry(req.take_entry());
resp.set_deadlock_key_hash(deadlock_key_hash);
resp.set_wait_chain(wait_chain.into());
Some((resp, WriteFlags::default()))
} else {
None
}
}
DeadlockRequestType::CleanUpWaitFor => {
detect_table.clean_up_wait_for(txn.into(), wait_for_txn.into(), *key_hash);
None
}
DeadlockRequestType::CleanUp => {
detect_table.clean_up(txn.into());
None
}
};
future::ok(res)
});
let send_task = async move {
let mut sink = sink.sink_map_err(Error::from);
sink.send_all(&mut s).await?;
sink.close().await?;
Result::Ok(())
}
.map_err(|e| warn!("deadlock detect rpc stream disconnected"; "error" => ?e));
spawn_local(send_task);
}
fn handle_change_role(&mut self, role: Role) {
debug!("handle change role"; "role" => ?role);
self.change_role(role);
}
fn handle_change_ttl(&mut self, ttl: Duration) {
let mut inner = self.inner.borrow_mut();
inner.detect_table.reset_ttl(ttl);
info!("Deadlock detector config changed"; "ttl" => ?ttl);
}
}
impl<S, P> FutureRunnable<Task> for Detector<S, P>
where
S: StoreAddrResolver + 'static,
P: PdClient + 'static,
{
fn run(&mut self, task: Task) {
match task {
Task::Detect {
tp,
txn_ts,
lock,
diag_ctx,
} => {
self.handle_detect(tp, txn_ts, lock, diag_ctx);
}
Task::DetectRpc { stream, sink } => {
self.handle_detect_rpc(stream, sink);
}
Task::ChangeRole(role) => self.handle_change_role(role),
Task::ChangeTTL(ttl) => self.handle_change_ttl(ttl),
#[cfg(any(test, feature = "testexport"))]
Task::Validate(f) => f(self.inner.borrow().detect_table.ttl.as_millis() as u64),
#[cfg(test)]
Task::GetRole(f) => f(self.inner.borrow().role),
}
}
}
#[derive(Clone)]
pub struct Service {
waiter_mgr_scheduler: WaiterMgrScheduler,
detector_scheduler: Scheduler,
}
impl Service {
pub fn new(waiter_mgr_scheduler: WaiterMgrScheduler, detector_scheduler: Scheduler) -> Self {
Self {
waiter_mgr_scheduler,
detector_scheduler,
}
}
}
impl Deadlock for Service {
fn get_wait_for_entries(
&mut self,
ctx: RpcContext<'_>,
_req: WaitForEntriesRequest,
sink: UnarySink<WaitForEntriesResponse>,
) {
let (cb, f) = paired_future_callback();
if !self.waiter_mgr_scheduler.dump_wait_table(cb) {
let status = RpcStatus::new(
RpcStatusCode::RESOURCE_EXHAUSTED,
Some("waiter manager has stopped".to_owned()),
);
ctx.spawn(sink.fail(status).map(|_| ()))
} else {
ctx.spawn(
f.map_err(Error::from)
.map_ok(|v| {
let mut resp = WaitForEntriesResponse::default();
resp.set_entries(v.into());
resp
})
.and_then(|resp| sink.success(resp).map_err(Error::Grpc))
.unwrap_or_else(|e| debug!("get_wait_for_entries failed"; "err" => ?e)),
);
}
}
fn detect(
&mut self,
ctx: RpcContext<'_>,
stream: RequestStream<DeadlockRequest>,
sink: DuplexSink<DeadlockResponse>,
) {
let task = Task::DetectRpc { stream, sink };
if let Err(Stopped(Task::DetectRpc { sink, .. })) = self.detector_scheduler.0.schedule(task)
{
let status = RpcStatus::new(
RpcStatusCode::RESOURCE_EXHAUSTED,
Some("deadlock detector has stopped".to_owned()),
);
ctx.spawn(sink.fail(status).map(|_| ()));
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::server::resolve::Callback;
use engine_test::kv::KvTestEngine;
use futures::executor::block_on;
use security::SecurityConfig;
use tikv_util::worker::FutureWorker;
#[test]
fn test_detect_table() {
let mut detect_table = DetectTable::new(Duration::from_secs(10));
assert_eq!(detect_table.detect(1.into(), 2.into(), 2, &[], &[]), None);
assert_eq!(
detect_table
.detect(2.into(), 1.into(), 1, &[], &[])
.unwrap()
.0,
2
);
assert_eq!(detect_table.detect(2.into(), 3.into(), 3, &[], &[]), None);
assert_eq!(
detect_table
.detect(3.into(), 1.into(), 1, &[], &[])
.unwrap()
.0,
3
);
detect_table.clean_up(2.into());
assert_eq!(detect_table.wait_for_map.contains_key(&2.into()), false);
assert_eq!(detect_table.detect(3.into(), 1.into(), 1, &[], &[]), None);
assert_eq!(detect_table.wait_for_map.get(&3.into()).unwrap().len(), 1);
assert_eq!(
detect_table
.wait_for_map
.get(&3.into())
.unwrap()
.get(&1.into())
.unwrap()
.keys
.len(),
1
);
assert_eq!(detect_table.detect(3.into(), 1.into(), 2, &[], &[]), None);
assert_eq!(
detect_table
.wait_for_map
.get(&3.into())
.unwrap()
.get(&1.into())
.unwrap()
.keys
.len(),
2
);
assert_eq!(detect_table.detect(3.into(), 1.into(), 2, &[], &[]), None);
assert_eq!(
detect_table
.wait_for_map
.get(&3.into())
.unwrap()
.get(&1.into())
.unwrap()
.keys
.len(),
2
);
assert_eq!(detect_table.detect(3.into(), 2.into(), 2, &[], &[]), None);
assert_eq!(detect_table.wait_for_map.get(&3.into()).unwrap().len(), 2);
assert_eq!(
detect_table
.wait_for_map
.get(&3.into())
.unwrap()
.get(&2.into())
.unwrap()
.keys
.len(),
1
);
detect_table.clean_up_wait_for(3.into(), 1.into(), 1);
assert_eq!(
detect_table
.wait_for_map
.get(&3.into())
.unwrap()
.get(&1.into())
.unwrap()
.keys
.len(),
1
);
detect_table.clean_up_wait_for(3.into(), 1.into(), 2);
assert_eq!(detect_table.wait_for_map.get(&3.into()).unwrap().len(), 1);
detect_table.clean_up_wait_for(3.into(), 2.into(), 2);
assert_eq!(detect_table.wait_for_map.contains_key(&3.into()), false);
detect_table.clean_up(3.into());
detect_table.clean_up_wait_for(3.into(), 1.into(), 1);
}
#[test]
fn test_detect_table_expire() {
let mut detect_table = DetectTable::new(Duration::from_millis(100));
assert!(
detect_table
.detect(1.into(), 2.into(), 1, &[], &[])
.is_none()
);
assert!(
detect_table
.detect(2.into(), 1.into(), 2, &[], &[])
.is_some()
);
std::thread::sleep(Duration::from_millis(500));
assert_eq!(detect_table.wait_for_map.len(), 1);
assert!(
detect_table
.detect(2.into(), 1.into(), 2, &[], &[])
.is_none()
);
assert_eq!(detect_table.wait_for_map.len(), 1);
detect_table.clear();
assert!(
detect_table
.detect(1.into(), 2.into(), 1, &[], &[])
.is_none()
);
std::thread::sleep(Duration::from_millis(500));
assert!(
detect_table
.detect(1.into(), 2.into(), 1, &[], &[])
.is_none()
);
assert!(
detect_table
.detect(2.into(), 1.into(), 2, &[], &[])
.is_some()
);
detect_table.clear();
assert!(
detect_table
.detect(1.into(), 2.into(), 1, &[], &[])
.is_none()
);
assert!(
detect_table
.detect(1.into(), 3.into(), 1, &[], &[])
.is_none()
);
assert_eq!(detect_table.wait_for_map.len(), 1);
std::thread::sleep(Duration::from_millis(500));
assert!(
detect_table
.detect(1.into(), 3.into(), 2, &[], &[])
.is_none()
);
assert!(
detect_table
.detect(2.into(), 1.into(), 2, &[], &[])
.is_none()
);
assert_eq!(detect_table.wait_for_map.get(&1.into()).unwrap().len(), 1);
assert_eq!(
detect_table
.wait_for_map
.get(&1.into())
.unwrap()
.get(&3.into())
.unwrap()
.keys
.len(),
2
);
std::thread::sleep(Duration::from_millis(500));
assert!(
detect_table
.detect(3.into(), 2.into(), 3, &[], &[])
.is_none()
);
assert_eq!(detect_table.wait_for_map.len(), 2);
assert!(
detect_table
.detect(3.into(), 1.into(), 3, &[], &[])
.is_none()
);
assert_eq!(detect_table.wait_for_map.len(), 1);
}
#[test]
fn test_deadlock_generating_wait_chain() {
#[derive(Clone, Copy, Debug, PartialEq)]
struct Edge<'a> {
ts: u64,
lock_ts: u64,
hash: u64,
key: &'a [u8],
tag: &'a [u8],
}
let new_edge = |ts, lock_ts, hash, key, tag| Edge {
ts,
lock_ts,
hash,
key,
tag,
};
let test_once = |edges: &[Edge]| {
let mut detect_table = DetectTable::new(Duration::from_millis(100));
let mut edge_map = HashMap::default();
for e in &edges[0..edges.len() - 1] {
assert!(
detect_table
.detect(e.ts.into(), e.lock_ts.into(), e.hash, e.key, e.tag)
.is_none()
);
edge_map.insert((e.ts, e.lock_ts), *e);
}
let last = edges.last().unwrap();
let (_, wait_chain) = detect_table
.detect(
last.ts.into(),
last.lock_ts.into(),
last.hash,
last.key,
last.tag,
)
.unwrap();
let mut current_position = last.lock_ts;
for (i, entry) in wait_chain.iter().enumerate() {
let edge = Edge {
ts: entry.get_txn(),
lock_ts: entry.get_wait_for_txn(),
hash: entry.get_key_hash(),
key: entry.get_key(),
tag: entry.get_resource_group_tag(),
};
let expect_edge = edge_map.get(&(edge.ts, edge.lock_ts)).unwrap();
assert_eq!(
edge, *expect_edge,
"failed at item {}, full wait chain {:?}",
i, wait_chain
);
assert_eq!(
edge.ts, current_position,
"failed at item {}, full wait chain {:?}",
i, wait_chain
);
current_position = edge.lock_ts;
}
assert_eq!(
current_position, last.ts,
"incorrect wait chain {:?}",
wait_chain
);
};
test_once(&[
new_edge(1, 2, 11, b"k1", b"tag1"),
new_edge(2, 1, 12, b"k2", b"tag2"),
]);
test_once(&[
new_edge(1, 2, 11, b"k1", b"tag1"),
new_edge(2, 3, 12, b"k2", b"tag2"),
new_edge(3, 1, 13, b"k3", b"tag3"),
]);
test_once(&[
new_edge(1, 2, 11, b"k12", b"tag12"),
new_edge(2, 3, 12, b"k23", b"tag23"),
new_edge(2, 4, 13, b"k24", b"tag24"),
new_edge(4, 1, 14, b"k41", b"tag41"),
]);
test_once(&[
new_edge(1, 2, 11, b"k12", b"tag12"),
new_edge(1, 3, 12, b"k13", b"tag13"),
new_edge(2, 4, 13, b"k24", b"tag24"),
new_edge(3, 5, 14, b"k35", b"tag35"),
new_edge(2, 5, 15, b"k25", b"tag25"),
new_edge(5, 6, 16, b"k56", b"tag56"),
new_edge(6, 1, 17, b"k61", b"tag61"),
]);
use rand::seq::SliceRandom;
let mut case = vec![
new_edge(1, 2, 11, b"k12", b"tag12"),
new_edge(1, 3, 12, b"k13", b"tag13"),
new_edge(2, 4, 13, b"k24", b"tag24"),
new_edge(3, 5, 14, b"k35", b"tag35"),
new_edge(2, 5, 15, b"k25", b"tag25"),
new_edge(5, 6, 16, b"k56", b"tag56"),
];
case.shuffle(&mut rand::thread_rng());
case.push(new_edge(6, 1, 17, b"k61", b"tag61"));
test_once(&case);
}
pub(crate) struct MockPdClient;
impl PdClient for MockPdClient {}
#[derive(Clone)]
pub(crate) struct MockResolver;
impl StoreAddrResolver for MockResolver {
fn resolve(&self, _store_id: u64, _cb: Callback) -> Result<()> {
Err(Error::Other(box_err!("unimplemented")))
}
}
fn start_deadlock_detector(
host: &mut CoprocessorHost<KvTestEngine>,
) -> (FutureWorker<Task>, Scheduler) {
let waiter_mgr_worker = FutureWorker::new("dummy-waiter-mgr");
let waiter_mgr_scheduler = WaiterMgrScheduler::new(waiter_mgr_worker.scheduler());
let mut detector_worker = FutureWorker::new("test-deadlock-detector");
let detector_runner = Detector::new(
1,
Arc::new(MockPdClient {}),
MockResolver {},
Arc::new(SecurityManager::new(&SecurityConfig::default()).unwrap()),
waiter_mgr_scheduler,
&Config::default(),
);
let detector_scheduler = Scheduler::new(detector_worker.scheduler());
let role_change_notifier = RoleChangeNotifier::new(detector_scheduler.clone());
role_change_notifier.register(host);
detector_worker.start(detector_runner).unwrap();
(detector_worker, detector_scheduler)
}
fn new_region(id: u64, start_key: &[u8], end_key: &[u8], valid: bool) -> Region {
let mut region = Region::default();
region.set_id(id);
region.set_start_key(start_key.to_vec());
region.set_end_key(end_key.to_vec());
if valid {
region.set_peers(vec![kvproto::metapb::Peer::default()].into());
}
region
}
#[test]
fn test_role_change_notifier() {
let mut host = CoprocessorHost::default();
let (mut worker, scheduler) = start_deadlock_detector(&mut host);
let mut region = new_region(1, b"", b"", true);
let invalid = new_region(2, b"", b"", false);
let other = new_region(3, b"0", b"", true);
let follower_roles = [
StateRole::Follower,
StateRole::PreCandidate,
StateRole::Candidate,
];
let events = [
RegionChangeEvent::Create,
RegionChangeEvent::Update,
RegionChangeEvent::Destroy,
];
let check_role = |role| {
let (tx, f) = paired_future_callback();
scheduler.get_role(tx);
assert_eq!(block_on(f).unwrap(), role);
};
for &event in &events[..2] {
for &follower_role in &follower_roles {
host.on_region_changed(®ion, event, follower_role);
check_role(Role::Follower);
host.on_region_changed(&invalid, event, StateRole::Leader);
check_role(Role::Follower);
host.on_region_changed(&other, event, StateRole::Leader);
check_role(Role::Follower);
host.on_region_changed(®ion, event, StateRole::Leader);
check_role(Role::Leader);
host.on_region_changed(&invalid, event, follower_role);
check_role(Role::Leader);
host.on_region_changed(&other, event, follower_role);
check_role(Role::Leader);
host.on_region_changed(®ion, event, follower_role);
check_role(Role::Follower);
}
}
host.on_region_changed(®ion, RegionChangeEvent::Create, StateRole::Leader);
host.on_region_changed(&invalid, RegionChangeEvent::Destroy, StateRole::Leader);
host.on_region_changed(&other, RegionChangeEvent::Destroy, StateRole::Leader);
check_role(Role::Leader);
host.on_region_changed(®ion, RegionChangeEvent::Destroy, StateRole::Leader);
check_role(Role::Follower);
region.set_id(2);
host.on_region_changed(®ion, RegionChangeEvent::Update, StateRole::Leader);
region.set_id(1);
host.on_region_changed(®ion, RegionChangeEvent::Destroy, StateRole::Leader);
check_role(Role::Leader);
let region = new_region(1, b"", b"", true);
host.on_region_changed(®ion, RegionChangeEvent::Create, StateRole::Follower);
check_role(Role::Follower);
for &follower_role in &follower_roles {
host.on_role_change(®ion, follower_role);
check_role(Role::Follower);
host.on_role_change(&invalid, StateRole::Leader);
check_role(Role::Follower);
host.on_role_change(&other, StateRole::Leader);
check_role(Role::Follower);
host.on_role_change(®ion, StateRole::Leader);
check_role(Role::Leader);
host.on_role_change(&invalid, follower_role);
check_role(Role::Leader);
host.on_role_change(&other, follower_role);
check_role(Role::Leader);
host.on_role_change(®ion, follower_role);
check_role(Role::Follower);
}
worker.stop();
}
}