use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::vec::IntoIter;
use engine_traits::CfName;
use kvproto::metapb::Region;
use kvproto::pdpb::CheckPolicy;
use kvproto::raft_cmdpb::{AdminRequest, AdminResponse, RaftCmdRequest, RaftCmdResponse, Request};
use raft::{eraftpb, StateRole};
pub mod config;
mod consistency_check;
pub mod dispatcher;
mod error;
mod metrics;
pub mod region_info_accessor;
mod split_check;
pub mod split_observer;
pub use self::config::{Config, ConsistencyCheckMethod};
pub use self::consistency_check::{ConsistencyCheckObserver, Raw as RawConsistencyCheckObserver};
pub use self::dispatcher::{
BoxAdminObserver, BoxApplySnapshotObserver, BoxCmdObserver, BoxConsistencyCheckObserver,
BoxQueryObserver, BoxRegionChangeObserver, BoxRoleObserver, BoxSplitCheckObserver,
CoprocessorHost, Registry,
};
pub use self::error::{Error, Result};
pub use self::region_info_accessor::{
Callback as RegionInfoCallback, RegionCollector, RegionInfo, RegionInfoAccessor,
RegionInfoProvider, SeekRegionCallback,
};
pub use self::split_check::{
get_region_approximate_keys, get_region_approximate_middle, get_region_approximate_size,
HalfCheckObserver, Host as SplitCheckerHost, KeysCheckObserver, SizeCheckObserver,
TableCheckObserver,
};
pub use crate::store::KeyEntry;
pub trait Coprocessor: Send {
fn start(&self) {}
fn stop(&self) {}
}
pub struct ObserverContext<'a> {
region: &'a Region,
pub bypass: bool,
}
impl<'a> ObserverContext<'a> {
pub fn new(region: &Region) -> ObserverContext<'_> {
ObserverContext {
region,
bypass: false,
}
}
pub fn region(&self) -> &Region {
self.region
}
}
pub trait AdminObserver: Coprocessor {
fn pre_propose_admin(&self, _: &mut ObserverContext<'_>, _: &mut AdminRequest) -> Result<()> {
Ok(())
}
fn pre_apply_admin(&self, _: &mut ObserverContext<'_>, _: &AdminRequest) {}
fn post_apply_admin(&self, _: &mut ObserverContext<'_>, _: &mut AdminResponse) {}
}
pub trait QueryObserver: Coprocessor {
fn pre_propose_query(&self, _: &mut ObserverContext<'_>, _: &mut Vec<Request>) -> Result<()> {
Ok(())
}
fn pre_apply_query(&self, _: &mut ObserverContext<'_>, _: &[Request]) {}
fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &mut Cmd) {}
}
pub trait ApplySnapshotObserver: Coprocessor {
fn apply_plain_kvs(&self, _: &mut ObserverContext<'_>, _: CfName, _: &[(Vec<u8>, Vec<u8>)]) {}
fn apply_sst(&self, _: &mut ObserverContext<'_>, _: CfName, _path: &str) {}
}
pub trait SplitChecker<E> {
fn on_kv(&mut self, _: &mut ObserverContext<'_>, _: &KeyEntry) -> bool {
false
}
fn split_keys(&mut self) -> Vec<Vec<u8>>;
fn approximate_split_keys(&mut self, _: &Region, _: &E) -> Result<Vec<Vec<u8>>> {
Ok(vec![])
}
fn policy(&self) -> CheckPolicy;
}
pub trait SplitCheckObserver<E>: Coprocessor {
fn add_checker(
&self,
_: &mut ObserverContext<'_>,
_: &mut SplitCheckerHost<'_, E>,
_: &E,
policy: CheckPolicy,
);
}
pub trait RoleObserver: Coprocessor {
fn on_role_change(&self, _: &mut ObserverContext<'_>, _: StateRole) {}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RegionChangeEvent {
Create,
Update,
Destroy,
}
pub trait RegionChangeObserver: Coprocessor {
fn on_region_changed(&self, _: &mut ObserverContext<'_>, _: RegionChangeEvent, _: StateRole) {}
}
#[derive(Clone, Debug)]
pub struct Cmd {
pub index: u64,
pub request: RaftCmdRequest,
pub response: RaftCmdResponse,
}
impl Cmd {
pub fn new(index: u64, request: RaftCmdRequest, response: RaftCmdResponse) -> Cmd {
Cmd {
index,
request,
response,
}
}
}
static OBSERVE_ID_ALLOC: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct ObserveID(usize);
impl ObserveID {
pub fn new() -> ObserveID {
ObserveID(OBSERVE_ID_ALLOC.fetch_add(1, Ordering::SeqCst))
}
}
#[derive(Clone, Default, Debug)]
pub struct ObserveHandle {
pub id: ObserveID,
observing: Arc<AtomicBool>,
}
impl ObserveHandle {
pub fn new() -> ObserveHandle {
ObserveHandle {
id: ObserveID::new(),
observing: Arc::new(AtomicBool::new(true)),
}
}
pub fn with_id(id: usize) -> ObserveHandle {
ObserveHandle {
id: ObserveID(id),
observing: Arc::new(AtomicBool::new(true)),
}
}
pub fn is_observing(&self) -> bool {
self.observing.load(Ordering::Acquire)
}
pub fn stop_observing(&self) {
self.observing.store(false, Ordering::Release)
}
}
#[derive(Clone, Debug)]
pub struct CmdBatch {
pub cdc_id: ObserveID,
pub rts_id: ObserveID,
pub region_id: u64,
pub cmds: Vec<Cmd>,
}
impl CmdBatch {
pub fn new(cdc_id: ObserveID, rts_id: ObserveID, region_id: u64) -> CmdBatch {
CmdBatch {
cdc_id,
rts_id,
region_id,
cmds: Vec::new(),
}
}
pub fn push(&mut self, cdc_id: ObserveID, rts_id: ObserveID, region_id: u64, cmd: Cmd) {
assert_eq!(region_id, self.region_id);
assert_eq!(cdc_id, self.cdc_id);
assert_eq!(rts_id, self.rts_id);
self.cmds.push(cmd)
}
pub fn into_iter(self, region_id: u64) -> IntoIter<Cmd> {
assert_eq!(self.region_id, region_id);
self.cmds.into_iter()
}
pub fn len(&self) -> usize {
self.cmds.len()
}
pub fn is_empty(&self) -> bool {
self.cmds.is_empty()
}
pub fn size(&self) -> usize {
let mut cmd_bytes = 0;
for cmd in self.cmds.iter() {
let Cmd {
ref request,
ref response,
..
} = cmd;
if !response.get_header().has_error() && !request.has_admin_request() {
for req in request.requests.iter() {
let put = req.get_put();
cmd_bytes += put.get_key().len();
cmd_bytes += put.get_value().len();
}
}
}
cmd_bytes
}
}
pub trait CmdObserver<E>: Coprocessor {
fn on_prepare_for_apply(&self, cdc_id: &ObserveHandle, rts_id: &ObserveHandle, region_id: u64);
fn on_apply_cmd(&self, cdc_id: ObserveID, rts_id: ObserveID, region_id: u64, cmd: &Cmd);
fn on_flush_apply(&self, engine: E);
}
pub trait ReadIndexObserver: Coprocessor {
fn on_step(&self, _msg: &mut eraftpb::Message) {}
}