use std::cell::RefCell;
use std::sync::{Arc, RwLock};
use collections::HashMap;
use engine_rocks::RocksEngine;
use engine_traits::KvEngine;
use fail::fail_point;
use kvproto::metapb::{Peer, Region};
use raft::StateRole;
use raftstore::coprocessor::*;
use raftstore::store::RegionSnapshot;
use raftstore::Error as RaftStoreError;
use tikv_util::worker::Scheduler;
use tikv_util::{error, warn};
use crate::endpoint::{Deregister, Task};
use crate::old_value::{self, OldValueCache, OldValueReader};
use crate::Error as CdcError;
#[derive(Clone)]
pub struct CdcObserver {
sched: Scheduler<Task>,
observe_regions: Arc<RwLock<HashMap<u64, ObserveID>>>,
cmd_batches: RefCell<Vec<CmdBatch>>,
last_batch_observing: RefCell<bool>,
}
impl CdcObserver {
pub fn new(sched: Scheduler<Task>) -> CdcObserver {
CdcObserver {
sched,
observe_regions: Arc::default(),
cmd_batches: RefCell::default(),
last_batch_observing: RefCell::from(false),
}
}
pub fn register_to(&self, coprocessor_host: &mut CoprocessorHost<RocksEngine>) {
coprocessor_host
.registry
.register_cmd_observer(100, BoxCmdObserver::new(self.clone()));
coprocessor_host
.registry
.register_role_observer(100, BoxRoleObserver::new(self.clone()));
coprocessor_host
.registry
.register_region_change_observer(100, BoxRegionChangeObserver::new(self.clone()));
}
pub fn subscribe_region(&self, region_id: u64, observe_id: ObserveID) -> Option<ObserveID> {
self.observe_regions
.write()
.unwrap()
.insert(region_id, observe_id)
}
pub fn unsubscribe_region(&self, region_id: u64, observe_id: ObserveID) -> Option<ObserveID> {
let mut regions = self.observe_regions.write().unwrap();
if let Some(oid) = regions.get(®ion_id) {
if *oid == observe_id {
return regions.remove(®ion_id);
}
}
None
}
pub fn is_subscribed(&self, region_id: u64) -> Option<ObserveID> {
self.observe_regions
.read()
.unwrap()
.get(®ion_id)
.cloned()
}
}
impl Coprocessor for CdcObserver {}
impl<E: KvEngine> CmdObserver<E> for CdcObserver {
fn on_prepare_for_apply(&self, cdc: &ObserveHandle, rts: &ObserveHandle, region_id: u64) {
let is_observing = cdc.is_observing();
*self.last_batch_observing.borrow_mut() = is_observing;
if !is_observing {
return;
}
self.cmd_batches
.borrow_mut()
.push(CmdBatch::new(cdc.id, rts.id, region_id));
}
fn on_apply_cmd(&self, cdc_id: ObserveID, rts_id: ObserveID, region_id: u64, cmd: &Cmd) {
if !*self.last_batch_observing.borrow() {
return;
}
self.cmd_batches
.borrow_mut()
.last_mut()
.expect("should exist some cmd batch")
.push(cdc_id, rts_id, region_id, cmd.clone());
}
fn on_flush_apply(&self, engine: E) {
fail_point!("before_cdc_flush_apply");
self.cmd_batches.borrow_mut().retain(|b| !b.is_empty());
if !self.cmd_batches.borrow().is_empty() {
let batches = self.cmd_batches.replace(Vec::default());
let mut region = Region::default();
region.mut_peers().push(Peer::default());
let snapshot =
RegionSnapshot::from_snapshot(Arc::new(engine.snapshot()), Arc::new(region));
let reader = OldValueReader::new(snapshot);
let get_old_value = move |key, query_ts, old_value_cache: &mut OldValueCache| {
old_value::get_old_value(&reader, key, query_ts, old_value_cache)
};
if let Err(e) = self.sched.schedule(Task::MultiBatch {
multi: batches,
old_value_cb: Box::new(get_old_value),
}) {
warn!("cdc schedule task failed"; "error" => ?e);
}
}
}
}
impl RoleObserver for CdcObserver {
fn on_role_change(&self, ctx: &mut ObserverContext<'_>, role: StateRole) {
if role != StateRole::Leader {
let region_id = ctx.region().get_id();
if let Some(observe_id) = self.is_subscribed(region_id) {
let store_err = RaftStoreError::NotLeader(region_id, None);
let deregister = Deregister::Region {
region_id,
observe_id,
err: CdcError::request(store_err.into()),
};
if let Err(e) = self.sched.schedule(Task::Deregister(deregister)) {
error!("cdc schedule cdc task failed"; "error" => ?e);
}
}
}
}
}
impl RegionChangeObserver for CdcObserver {
fn on_region_changed(
&self,
ctx: &mut ObserverContext<'_>,
event: RegionChangeEvent,
_: StateRole,
) {
if let RegionChangeEvent::Destroy = event {
let region_id = ctx.region().get_id();
if let Some(observe_id) = self.is_subscribed(region_id) {
let store_err = RaftStoreError::RegionNotFound(region_id);
let deregister = Deregister::Region {
region_id,
observe_id,
err: CdcError::request(store_err.into()),
};
if let Err(e) = self.sched.schedule(Task::Deregister(deregister)) {
error!("cdc schedule cdc task failed"; "error" => ?e);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use engine_rocks::RocksEngine;
use kvproto::metapb::Region;
use kvproto::raft_cmdpb::*;
use std::time::Duration;
use tikv::storage::kv::TestEngineBuilder;
#[test]
fn test_register_and_deregister() {
let (scheduler, mut rx) = tikv_util::worker::dummy_scheduler();
let observer = CdcObserver::new(scheduler);
let observe_handle = ObserveHandle::new();
let engine = TestEngineBuilder::new().build().unwrap().get_rocksdb();
<CdcObserver as CmdObserver<RocksEngine>>::on_prepare_for_apply(
&observer,
&observe_handle,
&observe_handle,
0,
);
<CdcObserver as CmdObserver<RocksEngine>>::on_apply_cmd(
&observer,
observe_handle.id,
observe_handle.id,
0,
&Cmd::new(0, RaftCmdRequest::default(), RaftCmdResponse::default()),
);
<CdcObserver as CmdObserver<RocksEngine>>::on_flush_apply(&observer, engine.clone());
match rx.recv_timeout(Duration::from_millis(10)).unwrap().unwrap() {
Task::MultiBatch { multi, .. } => {
assert_eq!(multi.len(), 1);
assert_eq!(multi[0].len(), 1);
}
_ => panic!("unexpected task"),
};
observe_handle.stop_observing();
<CdcObserver as CmdObserver<RocksEngine>>::on_prepare_for_apply(
&observer,
&observe_handle,
&observe_handle,
0,
);
<CdcObserver as CmdObserver<RocksEngine>>::on_apply_cmd(
&observer,
observe_handle.id,
observe_handle.id,
0,
&Cmd::new(0, RaftCmdRequest::default(), RaftCmdResponse::default()),
);
<CdcObserver as CmdObserver<RocksEngine>>::on_flush_apply(&observer, engine);
match rx.recv_timeout(Duration::from_millis(10)) {
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
_ => panic!("unexpected result"),
};
let mut region = Region::default();
region.set_id(1);
let mut ctx = ObserverContext::new(®ion);
observer.on_role_change(&mut ctx, StateRole::Follower);
rx.recv_timeout(Duration::from_millis(10)).unwrap_err();
let oid = ObserveID::new();
observer.subscribe_region(1, oid);
let mut ctx = ObserverContext::new(®ion);
observer.on_role_change(&mut ctx, StateRole::Follower);
match rx.recv_timeout(Duration::from_millis(10)).unwrap().unwrap() {
Task::Deregister(Deregister::Region {
region_id,
observe_id,
..
}) => {
assert_eq!(region_id, 1);
assert_eq!(observe_id, oid);
}
_ => panic!("unexpected task"),
};
observer.on_role_change(&mut ctx, StateRole::Leader);
rx.recv_timeout(Duration::from_millis(10)).unwrap_err();
assert_eq!(observer.unsubscribe_region(1, ObserveID::new()), None);
let oid_ = observer.unsubscribe_region(1, oid).unwrap();
assert_eq!(oid_, oid);
observer.on_role_change(&mut ctx, StateRole::Follower);
rx.recv_timeout(Duration::from_millis(10)).unwrap_err();
region.set_id(999);
let mut ctx = ObserverContext::new(®ion);
observer.on_role_change(&mut ctx, StateRole::Follower);
rx.recv_timeout(Duration::from_millis(10)).unwrap_err();
}
}