use std::borrow::Cow;
use std::fmt;
use std::time::Instant;
use bitflags::bitflags;
use engine_traits::{CompactedEvent, KvEngine, Snapshot};
use kvproto::import_sstpb::SstMeta;
use kvproto::kvrpcpb::{ExtraOp as TxnExtraOp, KeyRange, LeaderInfo};
use kvproto::metapb;
use kvproto::metapb::RegionEpoch;
use kvproto::pdpb::CheckPolicy;
use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse};
use kvproto::raft_serverpb::RaftMessage;
use kvproto::replication_modepb::ReplicationStatus;
use raft::SnapshotStatus;
use crate::store::fsm::apply::TaskRes as ApplyTaskRes;
use crate::store::fsm::apply::{CatchUpLogs, ChangeObserver};
use crate::store::metrics::RaftEventDurationType;
use crate::store::util::KeysInfoFormatter;
use crate::store::SnapKey;
use tikv_util::{deadline::Deadline, escape};
use super::{AbstractPeer, RegionSnapshot};
#[derive(Debug)]
pub struct ReadResponse<S: Snapshot> {
pub response: RaftCmdResponse,
pub snapshot: Option<RegionSnapshot<S>>,
pub txn_extra_op: TxnExtraOp,
}
#[derive(Debug)]
pub struct WriteResponse {
pub response: RaftCmdResponse,
}
impl<S> Clone for ReadResponse<S>
where
S: Snapshot,
{
fn clone(&self) -> ReadResponse<S> {
ReadResponse {
response: self.response.clone(),
snapshot: self.snapshot.clone(),
txn_extra_op: self.txn_extra_op,
}
}
}
pub type ReadCallback<S> = Box<dyn FnOnce(ReadResponse<S>) + Send>;
pub type WriteCallback = Box<dyn FnOnce(WriteResponse) + Send>;
pub type ExtCallback = Box<dyn FnOnce() + Send>;
pub enum Callback<S: Snapshot> {
None,
Read(ReadCallback<S>),
Write {
cb: WriteCallback,
proposed_cb: Option<ExtCallback>,
committed_cb: Option<ExtCallback>,
},
}
impl<S> Callback<S>
where
S: Snapshot,
{
pub fn write(cb: WriteCallback) -> Self {
Self::write_ext(cb, None, None)
}
pub fn write_ext(
cb: WriteCallback,
proposed_cb: Option<ExtCallback>,
committed_cb: Option<ExtCallback>,
) -> Self {
Callback::Write {
cb,
proposed_cb,
committed_cb,
}
}
pub fn invoke_with_response(self, resp: RaftCmdResponse) {
match self {
Callback::None => (),
Callback::Read(read) => {
let resp = ReadResponse {
response: resp,
snapshot: None,
txn_extra_op: TxnExtraOp::Noop,
};
read(resp);
}
Callback::Write { cb, .. } => {
let resp = WriteResponse { response: resp };
cb(resp);
}
}
}
pub fn invoke_proposed(&mut self) {
if let Callback::Write { proposed_cb, .. } = self {
if let Some(cb) = proposed_cb.take() {
cb()
}
}
}
pub fn invoke_committed(&mut self) {
if let Callback::Write { committed_cb, .. } = self {
if let Some(cb) = committed_cb.take() {
cb()
}
}
}
pub fn invoke_read(self, args: ReadResponse<S>) {
match self {
Callback::Read(read) => read(args),
other => panic!("expect Callback::Read(..), got {:?}", other),
}
}
pub fn is_none(&self) -> bool {
matches!(self, Callback::None)
}
}
impl<S> fmt::Debug for Callback<S>
where
S: Snapshot,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Callback::None => write!(fmt, "Callback::None"),
Callback::Read(_) => write!(fmt, "Callback::Read(..)"),
Callback::Write { .. } => write!(fmt, "Callback::Write(..)"),
}
}
}
bitflags! {
pub struct PeerTicks: u8 {
const RAFT = 0b00000001;
const RAFT_LOG_GC = 0b00000010;
const SPLIT_REGION_CHECK = 0b00000100;
const PD_HEARTBEAT = 0b00001000;
const CHECK_MERGE = 0b00010000;
const CHECK_PEER_STALE_STATE = 0b00100000;
}
}
impl PeerTicks {
#[inline]
pub fn tag(self) -> &'static str {
match self {
PeerTicks::RAFT => "raft",
PeerTicks::RAFT_LOG_GC => "raft_log_gc",
PeerTicks::SPLIT_REGION_CHECK => "split_region_check",
PeerTicks::PD_HEARTBEAT => "pd_heartbeat",
PeerTicks::CHECK_MERGE => "check_merge",
PeerTicks::CHECK_PEER_STALE_STATE => "check_peer_stale_state",
_ => unreachable!(),
}
}
pub fn get_all_ticks() -> &'static [PeerTicks] {
const TICKS: &[PeerTicks] = &[
PeerTicks::RAFT,
PeerTicks::RAFT_LOG_GC,
PeerTicks::SPLIT_REGION_CHECK,
PeerTicks::PD_HEARTBEAT,
PeerTicks::CHECK_MERGE,
PeerTicks::CHECK_PEER_STALE_STATE,
];
TICKS
}
}
#[derive(Debug, Clone, Copy)]
pub enum StoreTick {
CompactCheck,
PdStoreHeartbeat,
SnapGc,
CompactLockCf,
ConsistencyCheck,
CleanupImportSST,
RaftEnginePurge,
}
impl StoreTick {
#[inline]
pub fn tag(self) -> RaftEventDurationType {
match self {
StoreTick::CompactCheck => RaftEventDurationType::compact_check,
StoreTick::PdStoreHeartbeat => RaftEventDurationType::pd_store_heartbeat,
StoreTick::SnapGc => RaftEventDurationType::snap_gc,
StoreTick::CompactLockCf => RaftEventDurationType::compact_lock_cf,
StoreTick::ConsistencyCheck => RaftEventDurationType::consistency_check,
StoreTick::CleanupImportSST => RaftEventDurationType::cleanup_import_sst,
StoreTick::RaftEnginePurge => RaftEventDurationType::raft_engine_purge,
}
}
}
#[derive(Debug)]
pub enum MergeResultKind {
FromTargetLog,
FromTargetSnapshotStep1,
FromTargetSnapshotStep2,
Stale,
}
#[derive(Debug)]
pub enum SignificantMsg<SK>
where
SK: Snapshot,
{
SnapshotStatus {
region_id: u64,
to_peer_id: u64,
status: SnapshotStatus,
},
StoreUnreachable {
store_id: u64,
},
Unreachable {
region_id: u64,
to_peer_id: u64,
},
CatchUpLogs(CatchUpLogs),
MergeResult {
target_region_id: u64,
target: metapb::Peer,
result: MergeResultKind,
},
StoreResolved {
store_id: u64,
group_id: u64,
},
CaptureChange {
cmd: ChangeObserver,
region_epoch: RegionEpoch,
callback: Callback<SK>,
},
LeaderCallback(Callback<SK>),
}
pub enum CasualMessage<EK: KvEngine> {
SplitRegion {
region_epoch: RegionEpoch,
split_keys: Vec<Vec<u8>>,
callback: Callback<EK::Snapshot>,
source: Cow<'static, str>,
},
ComputeHashResult {
index: u64,
context: Vec<u8>,
hash: Vec<u8>,
},
RegionApproximateSize {
size: u64,
},
RegionApproximateKeys {
keys: u64,
},
CompactionDeclinedBytes {
bytes: u64,
},
HalfSplitRegion {
region_epoch: RegionEpoch,
policy: CheckPolicy,
source: &'static str,
},
GcSnap {
snaps: Vec<(SnapKey, bool)>,
},
ClearRegionSize,
RegionOverlapped,
SnapshotGenerated,
ForceCompactRaftLogs,
AccessPeer(Box<dyn FnOnce(&mut dyn AbstractPeer) + Send + 'static>),
QueryRegionLeaderResp {
region: metapb::Region,
leader: metapb::Peer,
},
}
impl<EK: KvEngine> fmt::Debug for CasualMessage<EK> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CasualMessage::ComputeHashResult {
index,
context,
ref hash,
} => write!(
fmt,
"ComputeHashResult [index: {}, context: {}, hash: {}]",
index,
log_wrappers::Value::key(&context),
escape(hash)
),
CasualMessage::SplitRegion {
ref split_keys,
source,
..
} => write!(
fmt,
"Split region with {} from {}",
KeysInfoFormatter(split_keys.iter()),
source,
),
CasualMessage::RegionApproximateSize { size } => {
write!(fmt, "Region's approximate size [size: {:?}]", size)
}
CasualMessage::RegionApproximateKeys { keys } => {
write!(fmt, "Region's approximate keys [keys: {:?}]", keys)
}
CasualMessage::CompactionDeclinedBytes { bytes } => {
write!(fmt, "compaction declined bytes {}", bytes)
}
CasualMessage::HalfSplitRegion { source, .. } => {
write!(fmt, "Half Split from {}", source)
}
CasualMessage::GcSnap { ref snaps } => write! {
fmt,
"gc snaps {:?}",
snaps
},
CasualMessage::ClearRegionSize => write! {
fmt,
"clear region size"
},
CasualMessage::RegionOverlapped => write!(fmt, "RegionOverlapped"),
CasualMessage::SnapshotGenerated => write!(fmt, "SnapshotGenerated"),
CasualMessage::ForceCompactRaftLogs => write!(fmt, "ForceCompactRaftLogs"),
CasualMessage::AccessPeer(_) => write!(fmt, "AccessPeer"),
CasualMessage::QueryRegionLeaderResp { .. } => write!(fmt, "QueryRegionLeaderResp"),
}
}
}
#[derive(Debug)]
pub struct RaftCommand<S: Snapshot> {
pub send_time: Instant,
pub request: RaftCmdRequest,
pub callback: Callback<S>,
pub deadline: Option<Deadline>,
}
impl<S: Snapshot> RaftCommand<S> {
#[inline]
pub fn new(request: RaftCmdRequest, callback: Callback<S>) -> RaftCommand<S> {
RaftCommand {
request,
callback,
send_time: Instant::now(),
deadline: None,
}
}
}
pub enum PeerMsg<EK: KvEngine> {
RaftMessage(RaftMessage),
RaftCommand(RaftCommand<EK::Snapshot>),
Tick(PeerTicks),
ApplyRes { res: ApplyTaskRes<EK::Snapshot> },
SignificantMsg(SignificantMsg<EK::Snapshot>),
Start,
Noop,
CasualMessage(CasualMessage<EK>),
HeartbeatPd,
UpdateReplicationMode,
}
impl<EK: KvEngine> fmt::Debug for PeerMsg<EK> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PeerMsg::RaftMessage(_) => write!(fmt, "Raft Message"),
PeerMsg::RaftCommand(_) => write!(fmt, "Raft Command"),
PeerMsg::Tick(tick) => write! {
fmt,
"{:?}",
tick
},
PeerMsg::SignificantMsg(msg) => write!(fmt, "{:?}", msg),
PeerMsg::ApplyRes { res } => write!(fmt, "ApplyRes {:?}", res),
PeerMsg::Start => write!(fmt, "Startup"),
PeerMsg::Noop => write!(fmt, "Noop"),
PeerMsg::CasualMessage(msg) => write!(fmt, "CasualMessage {:?}", msg),
PeerMsg::HeartbeatPd => write!(fmt, "HeartbeatPd"),
PeerMsg::UpdateReplicationMode => write!(fmt, "UpdateReplicationMode"),
}
}
}
pub enum StoreMsg<EK>
where
EK: KvEngine,
{
RaftMessage(RaftMessage),
ValidateSSTResult {
invalid_ssts: Vec<SstMeta>,
},
ClearRegionSizeInRange {
start_key: Vec<u8>,
end_key: Vec<u8>,
},
StoreUnreachable {
store_id: u64,
},
CompactedEvent(EK::CompactedEvent),
Tick(StoreTick),
Start {
store: metapb::Store,
},
CheckLeader {
leaders: Vec<LeaderInfo>,
cb: Box<dyn FnOnce(Vec<u64>) + Send>,
},
GetStoreSafeTS {
key_range: KeyRange,
cb: Box<dyn FnOnce(u64) + Send>,
},
#[cfg(any(test, feature = "testexport"))]
Validate(Box<dyn FnOnce(&crate::store::Config) + Send>),
UpdateReplicationMode(ReplicationStatus),
}
impl<EK> fmt::Debug for StoreMsg<EK>
where
EK: KvEngine,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
StoreMsg::RaftMessage(_) => write!(fmt, "Raft Message"),
StoreMsg::StoreUnreachable { store_id } => {
write!(fmt, "Store {} is unreachable", store_id)
}
StoreMsg::CompactedEvent(ref event) => write!(fmt, "CompactedEvent cf {}", event.cf()),
StoreMsg::ValidateSSTResult { .. } => write!(fmt, "Validate SST Result"),
StoreMsg::ClearRegionSizeInRange {
ref start_key,
ref end_key,
} => write!(
fmt,
"Clear Region size in range {:?} to {:?}",
start_key, end_key
),
StoreMsg::Tick(tick) => write!(fmt, "StoreTick {:?}", tick),
StoreMsg::Start { ref store } => write!(fmt, "Start store {:?}", store),
StoreMsg::CheckLeader { ref leaders, .. } => write!(fmt, "CheckLeader {:?}", leaders),
StoreMsg::GetStoreSafeTS { ref key_range, .. } => {
write!(fmt, "GetStoreSafeTS {:?}", key_range)
}
#[cfg(any(test, feature = "testexport"))]
StoreMsg::Validate(_) => write!(fmt, "Validate config"),
StoreMsg::UpdateReplicationMode(_) => write!(fmt, "UpdateReplicationMode"),
}
}
}