use std::collections::VecDeque;
use std::fmt::Display;
use std::option::Option;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex};
use std::{fmt, u64};
use kvproto::kvrpcpb::KeyRange;
use kvproto::metapb::{self, PeerRole};
use kvproto::raft_cmdpb::{AdminCmdType, ChangePeerRequest, ChangePeerV2Request, RaftCmdRequest};
use kvproto::raft_serverpb::RaftMessage;
use protobuf::{self, Message};
use raft::eraftpb::{self, ConfChangeType, ConfState, MessageType};
use raft::INVALID_INDEX;
use raft_proto::ConfChangeI;
use tikv_util::time::monotonic_raw_now;
use tikv_util::{box_err, debug};
use time::{Duration, Timespec};
use super::peer_storage;
use crate::{Error, Result};
use tikv_util::Either;
pub fn find_peer(region: &metapb::Region, store_id: u64) -> Option<&metapb::Peer> {
region
.get_peers()
.iter()
.find(|&p| p.get_store_id() == store_id)
}
pub fn find_peer_mut(region: &mut metapb::Region, store_id: u64) -> Option<&mut metapb::Peer> {
region
.mut_peers()
.iter_mut()
.find(|p| p.get_store_id() == store_id)
}
pub fn remove_peer(region: &mut metapb::Region, store_id: u64) -> Option<metapb::Peer> {
region
.get_peers()
.iter()
.position(|x| x.get_store_id() == store_id)
.map(|i| region.mut_peers().remove(i))
}
pub fn new_peer(store_id: u64, peer_id: u64) -> metapb::Peer {
let mut peer = metapb::Peer::default();
peer.set_store_id(store_id);
peer.set_id(peer_id);
peer.set_role(PeerRole::Voter);
peer
}
pub fn new_learner_peer(store_id: u64, peer_id: u64) -> metapb::Peer {
let mut peer = metapb::Peer::default();
peer.set_store_id(store_id);
peer.set_id(peer_id);
peer.set_role(PeerRole::Learner);
peer
}
pub fn check_key_in_region_exclusive(key: &[u8], region: &metapb::Region) -> Result<()> {
let end_key = region.get_end_key();
let start_key = region.get_start_key();
if start_key < key && (key < end_key || end_key.is_empty()) {
Ok(())
} else {
Err(Error::KeyNotInRegion(key.to_vec(), region.clone()))
}
}
pub fn check_key_in_region_inclusive(key: &[u8], region: &metapb::Region) -> Result<()> {
let end_key = region.get_end_key();
let start_key = region.get_start_key();
if key >= start_key && (end_key.is_empty() || key <= end_key) {
Ok(())
} else {
Err(Error::KeyNotInRegion(key.to_vec(), region.clone()))
}
}
pub fn check_key_in_region(key: &[u8], region: &metapb::Region) -> Result<()> {
let end_key = region.get_end_key();
let start_key = region.get_start_key();
if key >= start_key && (end_key.is_empty() || key < end_key) {
Ok(())
} else {
Err(Error::KeyNotInRegion(key.to_vec(), region.clone()))
}
}
#[inline]
fn is_first_vote_msg(msg: &eraftpb::Message) -> bool {
match msg.get_msg_type() {
MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => {
msg.get_term() == peer_storage::RAFT_INIT_LOG_TERM + 1
}
_ => false,
}
}
#[inline]
fn is_first_append_entry(msg: &eraftpb::Message) -> bool {
match msg.get_msg_type() {
MessageType::MsgAppend => {
let ent = msg.get_entries();
ent.len() == 1
&& ent[0].data.is_empty()
&& ent[0].index == peer_storage::RAFT_INIT_LOG_INDEX + 1
}
_ => false,
}
}
pub fn is_first_message(msg: &eraftpb::Message) -> bool {
is_first_vote_msg(msg) || is_first_append_entry(msg)
}
#[inline]
pub fn is_vote_msg(msg: &eraftpb::Message) -> bool {
let msg_type = msg.get_msg_type();
msg_type == MessageType::MsgRequestVote || msg_type == MessageType::MsgRequestPreVote
}
#[inline]
pub fn is_initial_msg(msg: &eraftpb::Message) -> bool {
let msg_type = msg.get_msg_type();
msg_type == MessageType::MsgRequestVote
|| msg_type == MessageType::MsgRequestPreVote
|| (msg_type == MessageType::MsgHeartbeat && msg.get_commit() == INVALID_INDEX)
}
const STR_CONF_CHANGE_ADD_NODE: &str = "AddNode";
const STR_CONF_CHANGE_REMOVE_NODE: &str = "RemoveNode";
const STR_CONF_CHANGE_ADDLEARNER_NODE: &str = "AddLearner";
pub fn conf_change_type_str(conf_type: eraftpb::ConfChangeType) -> &'static str {
match conf_type {
ConfChangeType::AddNode => STR_CONF_CHANGE_ADD_NODE,
ConfChangeType::RemoveNode => STR_CONF_CHANGE_REMOVE_NODE,
ConfChangeType::AddLearnerNode => STR_CONF_CHANGE_ADDLEARNER_NODE,
}
}
pub fn is_epoch_stale(epoch: &metapb::RegionEpoch, check_epoch: &metapb::RegionEpoch) -> bool {
epoch.get_version() < check_epoch.get_version()
|| epoch.get_conf_ver() < check_epoch.get_conf_ver()
}
#[derive(Debug, Copy, Clone)]
pub struct AdminCmdEpochState {
pub check_ver: bool,
pub check_conf_ver: bool,
pub change_ver: bool,
pub change_conf_ver: bool,
}
impl AdminCmdEpochState {
fn new(
check_ver: bool,
check_conf_ver: bool,
change_ver: bool,
change_conf_ver: bool,
) -> AdminCmdEpochState {
AdminCmdEpochState {
check_ver,
check_conf_ver,
change_ver,
change_conf_ver,
}
}
}
pub fn admin_cmd_epoch_lookup(admin_cmp_type: AdminCmdType) -> AdminCmdEpochState {
match admin_cmp_type {
AdminCmdType::InvalidAdmin => AdminCmdEpochState::new(false, false, false, false),
AdminCmdType::CompactLog => AdminCmdEpochState::new(false, false, false, false),
AdminCmdType::ComputeHash => AdminCmdEpochState::new(false, false, false, false),
AdminCmdType::VerifyHash => AdminCmdEpochState::new(false, false, false, false),
AdminCmdType::ChangePeer => AdminCmdEpochState::new(false, true, false, true),
AdminCmdType::ChangePeerV2 => AdminCmdEpochState::new(false, true, false, true),
AdminCmdType::Split => AdminCmdEpochState::new(true, true, true, false),
AdminCmdType::BatchSplit => AdminCmdEpochState::new(true, true, true, false),
AdminCmdType::PrepareMerge => AdminCmdEpochState::new(true, true, true, true),
AdminCmdType::CommitMerge => AdminCmdEpochState::new(true, true, true, false),
AdminCmdType::RollbackMerge => AdminCmdEpochState::new(true, true, true, false),
AdminCmdType::TransferLeader => AdminCmdEpochState::new(true, true, false, false),
}
}
pub static NORMAL_REQ_CHECK_VER: bool = true;
pub static NORMAL_REQ_CHECK_CONF_VER: bool = false;
pub fn check_region_epoch(
req: &RaftCmdRequest,
region: &metapb::Region,
include_region: bool,
) -> Result<()> {
let (check_ver, check_conf_ver) = if !req.has_admin_request() {
(NORMAL_REQ_CHECK_VER, NORMAL_REQ_CHECK_CONF_VER)
} else {
let epoch_state = admin_cmd_epoch_lookup(req.get_admin_request().get_cmd_type());
(epoch_state.check_ver, epoch_state.check_conf_ver)
};
if !check_ver && !check_conf_ver {
return Ok(());
}
if !req.get_header().has_region_epoch() {
return Err(box_err!("missing epoch!"));
}
let from_epoch = req.get_header().get_region_epoch();
compare_region_epoch(
from_epoch,
region,
check_conf_ver,
check_ver,
include_region,
)
}
pub fn compare_region_epoch(
from_epoch: &metapb::RegionEpoch,
region: &metapb::Region,
check_conf_ver: bool,
check_ver: bool,
include_region: bool,
) -> Result<()> {
let current_epoch = region.get_region_epoch();
if (check_conf_ver && from_epoch.get_conf_ver() != current_epoch.get_conf_ver())
|| (check_ver && from_epoch.get_version() != current_epoch.get_version())
{
debug!(
"epoch not match";
"region_id" => region.get_id(),
"from_epoch" => ?from_epoch,
"current_epoch" => ?current_epoch,
);
let regions = if include_region {
vec![region.to_owned()]
} else {
vec![]
};
return Err(Error::EpochNotMatch(
format!(
"current epoch of region {} is {:?}, but you \
sent {:?}",
region.get_id(),
current_epoch,
from_epoch
),
regions,
));
}
Ok(())
}
#[inline]
pub fn check_store_id(req: &RaftCmdRequest, store_id: u64) -> Result<()> {
let peer = req.get_header().get_peer();
if peer.get_store_id() == store_id {
Ok(())
} else {
Err(Error::StoreNotMatch {
to_store_id: peer.get_store_id(),
my_store_id: store_id,
})
}
}
#[inline]
pub fn check_term(req: &RaftCmdRequest, term: u64) -> Result<()> {
let header = req.get_header();
if header.get_term() == 0 || term <= header.get_term() + 1 {
Ok(())
} else {
Err(Error::StaleCommand)
}
}
#[inline]
pub fn check_peer_id(req: &RaftCmdRequest, peer_id: u64) -> Result<()> {
let header = req.get_header();
if header.get_peer().get_id() == peer_id {
Ok(())
} else {
Err(box_err!(
"mismatch peer id {} != {}",
header.get_peer().get_id(),
peer_id
))
}
}
#[inline]
pub fn build_key_range(start_key: &[u8], end_key: &[u8], reverse_scan: bool) -> KeyRange {
let mut range = KeyRange::default();
if reverse_scan {
range.set_start_key(end_key.to_vec());
range.set_end_key(start_key.to_vec());
} else {
range.set_start_key(start_key.to_vec());
range.set_end_key(end_key.to_vec());
}
range
}
pub fn region_on_same_stores(lhs: &metapb::Region, rhs: &metapb::Region) -> bool {
if lhs.get_peers().len() != rhs.get_peers().len() {
return false;
}
lhs.get_peers().iter().all(|lp| {
rhs.get_peers()
.iter()
.any(|rp| rp.get_store_id() == lp.get_store_id() && rp.get_role() == lp.get_role())
})
}
#[inline]
pub fn is_region_initialized(r: &metapb::Region) -> bool {
!r.get_peers().is_empty()
}
pub struct Lease {
bound: Option<Either<Timespec, Timespec>>,
max_lease: Duration,
max_drift: Duration,
last_update: Timespec,
remote: Option<RemoteLease>,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum LeaseState {
Suspect,
Valid,
Expired,
}
impl Lease {
pub fn new(max_lease: Duration) -> Lease {
Lease {
bound: None,
max_lease,
max_drift: max_lease / 3,
last_update: Timespec::new(0, 0),
remote: None,
}
}
fn next_expired_time(&self, send_ts: Timespec) -> Timespec {
send_ts + self.max_lease
}
pub fn renew(&mut self, send_ts: Timespec) {
let bound = self.next_expired_time(send_ts);
match self.bound {
Some(Either::Left(ts)) | Some(Either::Right(ts)) => {
if ts <= bound {
self.bound = Some(Either::Right(bound));
}
}
None => {
self.bound = Some(Either::Right(bound));
}
}
if let Some(Either::Right(bound)) = self.bound {
if bound - self.last_update > self.max_drift {
self.last_update = bound;
if let Some(ref r) = self.remote {
r.renew(bound);
}
}
}
}
pub fn suspect(&mut self, send_ts: Timespec) {
self.expire_remote_lease();
let bound = self.next_expired_time(send_ts);
self.bound = Some(Either::Left(bound));
}
pub fn inspect(&self, ts: Option<Timespec>) -> LeaseState {
match self.bound {
Some(Either::Left(_)) => LeaseState::Suspect,
Some(Either::Right(bound)) => {
if ts.unwrap_or_else(monotonic_raw_now) < bound {
LeaseState::Valid
} else {
LeaseState::Expired
}
}
None => LeaseState::Expired,
}
}
pub fn expire(&mut self) {
self.expire_remote_lease();
self.bound = None;
}
pub fn expire_remote_lease(&mut self) {
if let Some(r) = self.remote.take() {
r.expire();
}
}
pub fn maybe_new_remote_lease(&mut self, term: u64) -> Option<RemoteLease> {
if let Some(ref remote) = self.remote {
if remote.term() == term {
return None;
} else {
unreachable!("Must expire the old remote lease first!");
}
}
let expired_time = match self.bound {
Some(Either::Right(ts)) => timespec_to_u64(ts),
_ => 0,
};
let remote = RemoteLease {
expired_time: Arc::new(AtomicU64::new(expired_time)),
term,
};
let remote_clone = remote.clone();
self.remote = Some(remote);
Some(remote_clone)
}
}
impl fmt::Debug for Lease {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut fmter = fmt.debug_struct("Lease");
match self.bound {
Some(Either::Left(ts)) => fmter.field("suspect", &ts).finish(),
Some(Either::Right(ts)) => fmter.field("valid", &ts).finish(),
None => fmter.finish(),
}
}
}
#[derive(Clone)]
pub struct RemoteLease {
expired_time: Arc<AtomicU64>,
term: u64,
}
impl RemoteLease {
pub fn inspect(&self, ts: Option<Timespec>) -> LeaseState {
let expired_time = self.expired_time.load(AtomicOrdering::Acquire);
if ts.unwrap_or_else(monotonic_raw_now) < u64_to_timespec(expired_time) {
LeaseState::Valid
} else {
LeaseState::Expired
}
}
fn renew(&self, bound: Timespec) {
self.expired_time
.store(timespec_to_u64(bound), AtomicOrdering::Release);
}
fn expire(&self) {
self.expired_time.store(0, AtomicOrdering::Release);
}
pub fn term(&self) -> u64 {
self.term
}
}
impl fmt::Debug for RemoteLease {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("RemoteLease")
.field(
"expired_time",
&u64_to_timespec(self.expired_time.load(AtomicOrdering::Relaxed)),
)
.field("term", &self.term)
.finish()
}
}
const NSEC_PER_MSEC: i32 = 1_000_000;
const TIMESPEC_NSEC_SHIFT: usize = 32 - NSEC_PER_MSEC.leading_zeros() as usize;
const MSEC_PER_SEC: i64 = 1_000;
const TIMESPEC_SEC_SHIFT: usize = 64 - MSEC_PER_SEC.leading_zeros() as usize;
const TIMESPEC_NSEC_MASK: u64 = (1 << TIMESPEC_SEC_SHIFT) - 1;
#[inline]
fn timespec_to_u64(ts: Timespec) -> u64 {
assert!(ts.sec >= 0 && ts.sec < (1i64 << (64 - TIMESPEC_SEC_SHIFT)));
assert!(ts.nsec >= 0);
let ms = ts.nsec >> TIMESPEC_NSEC_SHIFT;
let sec = ts.sec << TIMESPEC_SEC_SHIFT;
sec as u64 | ms as u64
}
#[inline]
pub(crate) fn u64_to_timespec(u: u64) -> Timespec {
let sec = u >> TIMESPEC_SEC_SHIFT;
let nsec = (u & TIMESPEC_NSEC_MASK) << TIMESPEC_NSEC_SHIFT;
Timespec::new(sec as i64, nsec as i32)
}
#[inline]
pub fn parse_data_at<T: Message + Default>(data: &[u8], index: u64, tag: &str) -> T {
let mut result = T::default();
result.merge_from_bytes(data).unwrap_or_else(|e| {
panic!("{} data is corrupted at {}: {:?}", tag, index, e);
});
result
}
pub fn is_sibling_regions(lhs: &metapb::Region, rhs: &metapb::Region) -> bool {
if lhs.get_id() == rhs.get_id() {
return false;
}
if lhs.get_start_key() == rhs.get_end_key() && !rhs.get_end_key().is_empty() {
return true;
}
if lhs.get_end_key() == rhs.get_start_key() && !lhs.get_end_key().is_empty() {
return true;
}
false
}
pub fn conf_state_from_region(region: &metapb::Region) -> ConfState {
let mut conf_state = ConfState::default();
let mut in_joint = false;
for p in region.get_peers() {
match p.get_role() {
PeerRole::Voter => {
conf_state.mut_voters().push(p.get_id());
conf_state.mut_voters_outgoing().push(p.get_id());
}
PeerRole::Learner => conf_state.mut_learners().push(p.get_id()),
role => {
in_joint = true;
match role {
PeerRole::IncomingVoter => conf_state.mut_voters().push(p.get_id()),
PeerRole::DemotingVoter => {
conf_state.mut_voters_outgoing().push(p.get_id());
conf_state.mut_learners_next().push(p.get_id());
}
_ => unreachable!(),
}
}
}
}
if !in_joint {
conf_state.mut_voters_outgoing().clear();
}
conf_state
}
pub fn is_learner(peer: &metapb::Peer) -> bool {
peer.get_role() == PeerRole::Learner
}
pub struct KeysInfoFormatter<
'a,
I: std::iter::DoubleEndedIterator<Item = &'a Vec<u8>>
+ std::iter::ExactSizeIterator<Item = &'a Vec<u8>>
+ Clone,
>(pub I);
impl<
'a,
I: std::iter::DoubleEndedIterator<Item = &'a Vec<u8>>
+ std::iter::ExactSizeIterator<Item = &'a Vec<u8>>
+ Clone,
> fmt::Display for KeysInfoFormatter<'a, I>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut it = self.0.clone();
match it.len() {
0 => write!(f, "(no key)"),
1 => write!(f, "key {}", log_wrappers::Value::key(it.next().unwrap())),
_ => write!(
f,
"{} keys range from {} to {}",
it.len(),
log_wrappers::Value::key(it.next().unwrap()),
log_wrappers::Value::key(it.next_back().unwrap())
),
}
}
}
pub fn integration_on_half_fail_quorum_fn(voters: usize) -> usize {
(voters + 1) / 2 + 1
}
#[derive(PartialEq, Eq, Debug)]
pub enum ConfChangeKind {
Simple,
EnterJoint,
LeaveJoint,
}
impl ConfChangeKind {
pub fn confchange_kind(change_num: usize) -> ConfChangeKind {
match change_num {
0 => ConfChangeKind::LeaveJoint,
1 => ConfChangeKind::Simple,
_ => ConfChangeKind::EnterJoint,
}
}
}
pub trait ChangePeerI {
type CC: ConfChangeI;
type CP: AsRef<[ChangePeerRequest]>;
fn get_change_peers(&self) -> Self::CP;
fn to_confchange(&self, _: Vec<u8>) -> Self::CC;
}
impl<'a> ChangePeerI for &'a ChangePeerRequest {
type CC = eraftpb::ConfChange;
type CP = Vec<ChangePeerRequest>;
fn get_change_peers(&self) -> Vec<ChangePeerRequest> {
vec![ChangePeerRequest::clone(self)]
}
fn to_confchange(&self, ctx: Vec<u8>) -> eraftpb::ConfChange {
let mut cc = eraftpb::ConfChange::default();
cc.set_change_type(self.get_change_type());
cc.set_node_id(self.get_peer().get_id());
cc.set_context(ctx.into());
cc
}
}
impl<'a> ChangePeerI for &'a ChangePeerV2Request {
type CC = eraftpb::ConfChangeV2;
type CP = &'a [ChangePeerRequest];
fn get_change_peers(&self) -> &'a [ChangePeerRequest] {
self.get_changes()
}
fn to_confchange(&self, ctx: Vec<u8>) -> eraftpb::ConfChangeV2 {
let mut cc = eraftpb::ConfChangeV2::default();
let changes: Vec<_> = self
.get_changes()
.iter()
.map(|c| {
let mut ccs = eraftpb::ConfChangeSingle::default();
ccs.set_change_type(c.get_change_type());
ccs.set_node_id(c.get_peer().get_id());
ccs
})
.collect();
if changes.len() <= 1 {
cc.set_transition(eraftpb::ConfChangeTransition::Auto);
} else {
cc.set_transition(eraftpb::ConfChangeTransition::Explicit);
}
cc.set_changes(changes.into());
cc.set_context(ctx.into());
cc
}
}
pub struct MsgType<'a>(pub &'a RaftMessage);
impl Display for MsgType<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if !self.0.has_extra_msg() {
write!(f, "{:?}", self.0.get_message().get_msg_type())
} else {
write!(f, "{:?}", self.0.get_extra_msg().get_type())
}
}
}
#[derive(Default, Debug)]
pub struct RegionReadProgress {
core: Mutex<RegionReadProgressCore>,
safe_ts: AtomicU64,
}
impl RegionReadProgress {
pub fn new(applied_index: u64, cap: usize) -> RegionReadProgress {
RegionReadProgress {
core: Mutex::new(RegionReadProgressCore::new(applied_index, cap)),
safe_ts: AtomicU64::from(0),
}
}
pub fn update_applied(&self, applied: u64) {
let mut core = self.core.lock().unwrap();
if let Some(ts) = core.update_applied(applied) {
self.safe_ts.store(ts, AtomicOrdering::Release);
}
}
pub fn update_safe_ts(&self, apply_index: u64, ts: u64) {
if apply_index == 0 || ts == 0 {
return;
}
let mut core = self.core.lock().unwrap();
if let Some(ts) = core.update_safe_ts(apply_index, ts) {
self.safe_ts.store(ts, AtomicOrdering::Release);
}
}
pub fn clear(&self) {
let mut core = self.core.lock().unwrap();
core.clear();
self.safe_ts.store(0, AtomicOrdering::Release);
}
pub fn read_state(&self) -> ReadState {
let core = self.core.lock().unwrap();
core.pending_items
.back()
.unwrap_or(&core.read_state)
.clone()
}
pub fn safe_ts(&self) -> u64 {
self.safe_ts.load(AtomicOrdering::Acquire)
}
}
#[derive(Default, Debug)]
struct RegionReadProgressCore {
applied_index: u64,
read_state: ReadState,
pending_items: VecDeque<ReadState>,
}
#[derive(Clone, Debug, Default)]
pub struct ReadState {
pub idx: u64,
pub ts: u64,
}
impl RegionReadProgressCore {
fn new(applied_index: u64, cap: usize) -> RegionReadProgressCore {
RegionReadProgressCore {
applied_index,
read_state: ReadState::default(),
pending_items: VecDeque::with_capacity(cap),
}
}
fn update_applied(&mut self, applied: u64) -> Option<u64> {
assert!(applied >= self.applied_index);
self.applied_index = applied;
let mut to_update = self.read_state.clone();
while let Some(item) = self.pending_items.pop_front() {
if self.applied_index < item.idx {
self.pending_items.push_front(item);
break;
}
if to_update.ts < item.ts {
to_update = item;
}
}
if self.read_state.ts < to_update.ts {
self.read_state = to_update;
Some(self.read_state.ts)
} else {
None
}
}
fn update_safe_ts(&mut self, idx: u64, ts: u64) -> Option<u64> {
if self.applied_index >= idx {
let mut updated_ts = None;
if self.read_state.ts < ts {
self.read_state = ReadState { idx, ts };
updated_ts = Some(ts);
}
return updated_ts;
}
if let Some(prev_item) = self.pending_items.back_mut() {
if prev_item.ts >= ts {
return None;
}
if prev_item.idx >= idx {
prev_item.ts = ts;
return None;
}
}
self.push_back(ReadState { idx, ts });
None
}
fn clear(&mut self) {
self.pending_items.clear();
self.read_state.ts = 0;
self.read_state.idx = 0;
}
fn push_back(&mut self, item: ReadState) {
if self.pending_items.len() >= self.pending_items.capacity() {
let mut keep = false;
self.pending_items.retain(|_| {
keep = !keep;
keep
});
}
self.pending_items.push_back(item);
}
}
#[cfg(test)]
mod tests {
use std::thread;
use kvproto::metapb::{self, RegionEpoch};
use kvproto::raft_cmdpb::AdminRequest;
use raft::eraftpb::{ConfChangeType, Entry, Message, MessageType};
use time::Duration as TimeDuration;
use crate::store::peer_storage;
use tikv_util::time::monotonic_raw_now;
use super::*;
#[test]
fn test_lease() {
#[inline]
fn sleep_test(duration: TimeDuration, lease: &Lease, state: LeaseState) {
let monotonic_raw_start = monotonic_raw_now();
thread::sleep(duration.to_std().unwrap());
let mut monotonic_raw_end = monotonic_raw_now();
while monotonic_raw_end - monotonic_raw_start < duration {
thread::yield_now();
monotonic_raw_end = monotonic_raw_now();
}
assert_eq!(lease.inspect(Some(monotonic_raw_end)), state);
assert_eq!(lease.inspect(None), state);
}
let duration = TimeDuration::milliseconds(1500);
let mut lease = Lease::new(duration);
let remote = lease.maybe_new_remote_lease(1).unwrap();
let inspect_test = |lease: &Lease, ts: Option<Timespec>, state: LeaseState| {
assert_eq!(lease.inspect(ts), state);
if state == LeaseState::Expired || state == LeaseState::Suspect {
assert_eq!(remote.inspect(ts), LeaseState::Expired);
}
};
inspect_test(&lease, Some(monotonic_raw_now()), LeaseState::Expired);
let now = monotonic_raw_now();
let next_expired_time = lease.next_expired_time(now);
assert_eq!(now + duration, next_expired_time);
lease.renew(now);
inspect_test(&lease, Some(monotonic_raw_now()), LeaseState::Valid);
inspect_test(&lease, None, LeaseState::Valid);
sleep_test(duration, &lease, LeaseState::Expired);
inspect_test(&lease, Some(monotonic_raw_now()), LeaseState::Expired);
inspect_test(&lease, None, LeaseState::Expired);
lease.suspect(monotonic_raw_now());
inspect_test(&lease, Some(monotonic_raw_now()), LeaseState::Suspect);
inspect_test(&lease, None, LeaseState::Suspect);
sleep_test(duration, &lease, LeaseState::Suspect);
inspect_test(&lease, Some(monotonic_raw_now()), LeaseState::Suspect);
lease.expire();
inspect_test(&lease, Some(monotonic_raw_now()), LeaseState::Expired);
inspect_test(&lease, None, LeaseState::Expired);
lease.renew(monotonic_raw_now() + TimeDuration::minutes(1));
assert_eq!(
remote.inspect(Some(monotonic_raw_now())),
LeaseState::Expired
);
let m1 = lease.maybe_new_remote_lease(1).unwrap();
assert_eq!(m1.inspect(Some(monotonic_raw_now())), LeaseState::Valid);
}
#[test]
fn test_timespec_u64() {
let cases = vec![
(Timespec::new(0, 0), 0x0000_0000_0000_0000u64),
(Timespec::new(0, 1), 0x0000_0000_0000_0000u64),
(Timespec::new(0, 999_999), 0x0000_0000_0000_0000u64),
(
Timespec::new(0, 1_048_575 ),
0x0000_0000_0000_0000u64,
),
(
Timespec::new(0, 1_048_576 ),
0x0000_0000_0000_0001u64,
),
(Timespec::new(1, 0), 1 << TIMESPEC_SEC_SHIFT),
(Timespec::new(1, 0x100000), 1 << TIMESPEC_SEC_SHIFT | 1),
(
Timespec::new((1i64 << (64 - TIMESPEC_SEC_SHIFT)) - 1, 0),
(-1i64 as u64) << TIMESPEC_SEC_SHIFT,
),
(
Timespec::new(
0,
(999_999_999 >> TIMESPEC_NSEC_SHIFT) << TIMESPEC_NSEC_SHIFT,
),
999_999_999 >> TIMESPEC_NSEC_SHIFT,
),
(
Timespec::new(
(1i64 << (64 - TIMESPEC_SEC_SHIFT)) - 1,
(999_999_999 >> TIMESPEC_NSEC_SHIFT) << TIMESPEC_NSEC_SHIFT,
),
(-1i64 as u64) << TIMESPEC_SEC_SHIFT | (999_999_999 >> TIMESPEC_NSEC_SHIFT),
),
];
for (ts, u) in cases {
assert!(u64_to_timespec(timespec_to_u64(ts)) <= ts);
assert!(u64_to_timespec(u) <= ts);
assert_eq!(timespec_to_u64(u64_to_timespec(u)), u);
assert_eq!(timespec_to_u64(ts), u);
}
let start = monotonic_raw_now();
let mut now = monotonic_raw_now();
while now - start < Duration::seconds(1) {
let u = timespec_to_u64(now);
let round = u64_to_timespec(u);
assert!(round <= now, "{:064b} = {:?} > {:?}", u, round, now);
now = monotonic_raw_now();
}
}
#[test]
fn test_check_key_in_region() {
let test_cases = vec![
("", "", "", true, true, false),
("", "", "6", true, true, false),
("", "3", "6", false, false, false),
("4", "3", "6", true, true, true),
("4", "3", "", true, true, true),
("3", "3", "", true, true, false),
("2", "3", "6", false, false, false),
("", "3", "6", false, false, false),
("", "3", "", false, false, false),
("6", "3", "6", false, true, false),
];
for (key, start_key, end_key, is_in_region, inclusive, exclusive) in test_cases {
let mut region = metapb::Region::default();
region.set_start_key(start_key.as_bytes().to_vec());
region.set_end_key(end_key.as_bytes().to_vec());
let mut result = check_key_in_region(key.as_bytes(), ®ion);
assert_eq!(result.is_ok(), is_in_region);
result = check_key_in_region_inclusive(key.as_bytes(), ®ion);
assert_eq!(result.is_ok(), inclusive);
result = check_key_in_region_exclusive(key.as_bytes(), ®ion);
assert_eq!(result.is_ok(), exclusive);
}
}
fn gen_region(
voters: &[u64],
learners: &[u64],
incomming_v: &[u64],
demoting_v: &[u64],
) -> metapb::Region {
let mut region = metapb::Region::default();
macro_rules! push_peer {
($ids: ident, $role: expr) => {
for id in $ids {
let mut peer = metapb::Peer::default();
peer.set_id(*id);
peer.set_role($role);
region.mut_peers().push(peer);
}
};
}
push_peer!(voters, metapb::PeerRole::Voter);
push_peer!(learners, metapb::PeerRole::Learner);
push_peer!(incomming_v, metapb::PeerRole::IncomingVoter);
push_peer!(demoting_v, metapb::PeerRole::DemotingVoter);
region
}
#[test]
fn test_conf_state_from_region() {
let cases = vec![
(vec![1], vec![2], vec![], vec![]),
(vec![], vec![], vec![1], vec![2]),
(vec![1, 2], vec![], vec![], vec![]),
(vec![1, 2], vec![], vec![3], vec![]),
(vec![1], vec![2], vec![3, 4], vec![5, 6]),
];
for (voter, learner, incomming, demoting) in cases {
let region = gen_region(
voter.as_slice(),
learner.as_slice(),
incomming.as_slice(),
demoting.as_slice(),
);
let cs = conf_state_from_region(®ion);
if incomming.is_empty() && demoting.is_empty() {
assert!(cs.get_voters_outgoing().is_empty());
assert!(cs.get_learners_next().is_empty());
assert!(voter.iter().all(|id| cs.get_voters().contains(id)));
assert!(learner.iter().all(|id| cs.get_learners().contains(id)));
} else {
assert!(voter.iter().all(
|id| cs.get_voters().contains(id) && cs.get_voters_outgoing().contains(id)
));
assert!(learner.iter().all(|id| cs.get_learners().contains(id)));
assert!(incomming.iter().all(|id| cs.get_voters().contains(id)));
assert!(
demoting
.iter()
.all(|id| cs.get_voters_outgoing().contains(id)
&& cs.get_learners_next().contains(id))
);
}
}
}
#[test]
fn test_changepeer_v2_to_confchange() {
let mut req = ChangePeerV2Request::default();
assert_eq!(
(&req).to_confchange(vec![]).get_transition(),
eraftpb::ConfChangeTransition::Auto
);
req.mut_changes().push(ChangePeerRequest::default());
assert_eq!(
(&req).to_confchange(vec![]).get_transition(),
eraftpb::ConfChangeTransition::Auto
);
req.mut_changes().push(ChangePeerRequest::default());
assert_eq!(
(&req).to_confchange(vec![]).get_transition(),
eraftpb::ConfChangeTransition::Explicit
);
req.mut_changes().push(ChangePeerRequest::default());
assert_eq!(
(&req).to_confchange(vec![]).get_transition(),
eraftpb::ConfChangeTransition::Explicit
);
}
#[test]
fn test_peer() {
let mut region = metapb::Region::default();
region.set_id(1);
region.mut_peers().push(new_peer(1, 1));
region.mut_peers().push(new_learner_peer(2, 2));
assert!(!is_learner(find_peer(®ion, 1).unwrap()));
assert!(is_learner(find_peer(®ion, 2).unwrap()));
assert!(remove_peer(&mut region, 1).is_some());
assert!(remove_peer(&mut region, 1).is_none());
assert!(find_peer(®ion, 1).is_none());
}
#[test]
fn test_first_vote_msg() {
let tbl = vec![
(
MessageType::MsgRequestVote,
peer_storage::RAFT_INIT_LOG_TERM + 1,
true,
),
(
MessageType::MsgRequestPreVote,
peer_storage::RAFT_INIT_LOG_TERM + 1,
true,
),
(
MessageType::MsgRequestVote,
peer_storage::RAFT_INIT_LOG_TERM,
false,
),
(
MessageType::MsgRequestPreVote,
peer_storage::RAFT_INIT_LOG_TERM,
false,
),
(
MessageType::MsgHup,
peer_storage::RAFT_INIT_LOG_TERM + 1,
false,
),
];
for (msg_type, term, is_vote) in tbl {
let mut msg = Message::default();
msg.set_msg_type(msg_type);
msg.set_term(term);
assert_eq!(is_first_vote_msg(&msg), is_vote);
}
}
#[test]
fn test_first_append_entry() {
let tbl = vec![
(
MessageType::MsgAppend,
peer_storage::RAFT_INIT_LOG_INDEX + 1,
true,
),
(
MessageType::MsgAppend,
peer_storage::RAFT_INIT_LOG_INDEX,
false,
),
(
MessageType::MsgHup,
peer_storage::RAFT_INIT_LOG_INDEX + 1,
false,
),
];
for (msg_type, index, is_append) in tbl {
let mut msg = Message::default();
msg.set_msg_type(msg_type);
let ent = {
let mut e = Entry::default();
e.set_index(index);
e
};
msg.set_entries(vec![ent].into());
assert_eq!(is_first_append_entry(&msg), is_append);
}
}
#[test]
fn test_is_initial_msg() {
let tbl = vec![
(MessageType::MsgRequestVote, INVALID_INDEX, true),
(MessageType::MsgRequestPreVote, INVALID_INDEX, true),
(MessageType::MsgHeartbeat, INVALID_INDEX, true),
(MessageType::MsgHeartbeat, 100, false),
(MessageType::MsgAppend, 100, false),
];
for (msg_type, commit, can_create) in tbl {
let mut msg = Message::default();
msg.set_msg_type(msg_type);
msg.set_commit(commit);
assert_eq!(is_initial_msg(&msg), can_create);
}
}
#[test]
fn test_conf_change_type_str() {
assert_eq!(
conf_change_type_str(ConfChangeType::AddNode),
STR_CONF_CHANGE_ADD_NODE
);
assert_eq!(
conf_change_type_str(ConfChangeType::RemoveNode),
STR_CONF_CHANGE_REMOVE_NODE
);
}
#[test]
fn test_epoch_stale() {
let mut epoch = metapb::RegionEpoch::default();
epoch.set_version(10);
epoch.set_conf_ver(10);
let tbl = vec![
(11, 10, true),
(10, 11, true),
(10, 10, false),
(10, 9, false),
];
for (version, conf_version, is_stale) in tbl {
let mut check_epoch = metapb::RegionEpoch::default();
check_epoch.set_version(version);
check_epoch.set_conf_ver(conf_version);
assert_eq!(is_epoch_stale(&epoch, &check_epoch), is_stale);
}
}
#[test]
fn test_on_same_store() {
let cases = vec![
(vec![2, 3, 4], vec![], vec![1, 2, 3], vec![], false),
(vec![2, 3, 1], vec![], vec![1, 2, 3], vec![], true),
(vec![2, 3, 4], vec![], vec![1, 2], vec![], false),
(vec![1, 2, 3], vec![], vec![1, 2, 3], vec![], true),
(vec![1, 3], vec![2, 4], vec![1, 2], vec![3, 4], false),
(vec![1, 3], vec![2, 4], vec![1, 3], vec![], false),
(vec![1, 3], vec![2, 4], vec![], vec![2, 4], false),
(vec![1, 3], vec![2, 4], vec![3, 1], vec![4, 2], true),
];
for (s1, s2, s3, s4, exp) in cases {
let mut r1 = metapb::Region::default();
for (store_id, peer_id) in s1.into_iter().zip(0..) {
r1.mut_peers().push(new_peer(store_id, peer_id));
}
for (store_id, peer_id) in s2.into_iter().zip(0..) {
r1.mut_peers().push(new_learner_peer(store_id, peer_id));
}
let mut r2 = metapb::Region::default();
for (store_id, peer_id) in s3.into_iter().zip(10..) {
r2.mut_peers().push(new_peer(store_id, peer_id));
}
for (store_id, peer_id) in s4.into_iter().zip(10..) {
r2.mut_peers().push(new_learner_peer(store_id, peer_id));
}
let res = super::region_on_same_stores(&r1, &r2);
assert_eq!(res, exp, "{:?} vs {:?}", r1, r2);
}
}
fn split(mut r: metapb::Region, key: &[u8]) -> (metapb::Region, metapb::Region) {
let mut r2 = r.clone();
r.set_end_key(key.to_owned());
r2.set_id(r.get_id() + 1);
r2.set_start_key(key.to_owned());
(r, r2)
}
fn check_sibling(r1: &metapb::Region, r2: &metapb::Region, is_sibling: bool) {
assert_eq!(is_sibling_regions(r1, r2), is_sibling);
assert_eq!(is_sibling_regions(r2, r1), is_sibling);
}
#[test]
fn test_region_sibling() {
let r1 = metapb::Region::default();
check_sibling(&r1, &r1, false);
let (r1, r2) = split(r1, b"k1");
check_sibling(&r1, &r2, true);
let (r2, r3) = split(r2, b"k2");
check_sibling(&r2, &r3, true);
let (r3, r4) = split(r3, b"k3");
check_sibling(&r3, &r4, true);
check_sibling(&r1, &r2, true);
check_sibling(&r2, &r3, true);
check_sibling(&r1, &r3, false);
check_sibling(&r2, &r4, false);
check_sibling(&r1, &r4, false);
}
#[test]
fn test_check_store_id() {
let mut req = RaftCmdRequest::default();
req.mut_header().mut_peer().set_store_id(1);
check_store_id(&req, 1).unwrap();
check_store_id(&req, 2).unwrap_err();
}
#[test]
fn test_check_peer_id() {
let mut req = RaftCmdRequest::default();
req.mut_header().mut_peer().set_id(1);
check_peer_id(&req, 1).unwrap();
check_peer_id(&req, 2).unwrap_err();
}
#[test]
fn test_check_term() {
let mut req = RaftCmdRequest::default();
req.mut_header().set_term(7);
check_term(&req, 7).unwrap();
check_term(&req, 8).unwrap();
check_term(&req, 9).unwrap_err();
check_term(&req, 10).unwrap_err();
}
#[test]
fn test_check_region_epoch() {
let mut epoch = RegionEpoch::default();
epoch.set_conf_ver(2);
epoch.set_version(2);
let mut region = metapb::Region::default();
region.set_region_epoch(epoch.clone());
check_region_epoch(&RaftCmdRequest::default(), ®ion, false).unwrap_err();
for ty in &[
AdminCmdType::CompactLog,
AdminCmdType::InvalidAdmin,
AdminCmdType::ComputeHash,
AdminCmdType::VerifyHash,
] {
let mut admin = AdminRequest::default();
admin.set_cmd_type(*ty);
let mut req = RaftCmdRequest::default();
req.set_admin_request(admin);
check_region_epoch(&req, ®ion, false).unwrap();
req.mut_header().set_region_epoch(epoch.clone());
check_region_epoch(&req, ®ion, true).unwrap();
check_region_epoch(&req, ®ion, false).unwrap();
}
for ty in &[
AdminCmdType::Split,
AdminCmdType::BatchSplit,
AdminCmdType::PrepareMerge,
AdminCmdType::CommitMerge,
AdminCmdType::RollbackMerge,
AdminCmdType::TransferLeader,
] {
let mut admin = AdminRequest::default();
admin.set_cmd_type(*ty);
let mut req = RaftCmdRequest::default();
req.set_admin_request(admin);
check_region_epoch(&req, ®ion, false).unwrap_err();
let mut stale_version_epoch = epoch.clone();
stale_version_epoch.set_version(1);
let mut stale_region = metapb::Region::default();
stale_region.set_region_epoch(stale_version_epoch.clone());
req.mut_header()
.set_region_epoch(stale_version_epoch.clone());
check_region_epoch(&req, &stale_region, false).unwrap();
let mut latest_version_epoch = epoch.clone();
latest_version_epoch.set_version(3);
for epoch in &[stale_version_epoch, latest_version_epoch] {
req.mut_header().set_region_epoch(epoch.clone());
check_region_epoch(&req, ®ion, false).unwrap_err();
check_region_epoch(&req, ®ion, true).unwrap_err();
}
}
for ty in &[
AdminCmdType::Split,
AdminCmdType::BatchSplit,
AdminCmdType::ChangePeer,
AdminCmdType::ChangePeerV2,
AdminCmdType::PrepareMerge,
AdminCmdType::CommitMerge,
AdminCmdType::RollbackMerge,
AdminCmdType::TransferLeader,
] {
let mut admin = AdminRequest::default();
admin.set_cmd_type(*ty);
let mut req = RaftCmdRequest::default();
req.set_admin_request(admin);
check_region_epoch(&req, ®ion, false).unwrap_err();
let mut stale_conf_epoch = epoch.clone();
stale_conf_epoch.set_conf_ver(1);
let mut stale_region = metapb::Region::default();
stale_region.set_region_epoch(stale_conf_epoch.clone());
req.mut_header().set_region_epoch(stale_conf_epoch.clone());
check_region_epoch(&req, &stale_region, false).unwrap();
let mut latest_conf_epoch = epoch.clone();
latest_conf_epoch.set_conf_ver(3);
for epoch in &[stale_conf_epoch, latest_conf_epoch] {
req.mut_header().set_region_epoch(epoch.clone());
check_region_epoch(&req, ®ion, false).unwrap_err();
check_region_epoch(&req, ®ion, true).unwrap_err();
}
}
}
#[test]
fn test_integration_on_half_fail_quorum_fn() {
let voters = vec![1, 2, 3, 4, 5, 6, 7];
let quorum = vec![2, 2, 3, 3, 4, 4, 5];
for (voter_count, expected_quorum) in voters.into_iter().zip(quorum) {
let quorum = super::integration_on_half_fail_quorum_fn(voter_count);
assert_eq!(quorum, expected_quorum);
}
}
#[test]
fn test_is_region_initialized() {
let mut region = metapb::Region::default();
assert!(!is_region_initialized(®ion));
let peers = vec![new_peer(1, 2)];
region.set_peers(peers.into());
assert!(is_region_initialized(®ion));
}
#[test]
fn test_region_read_progress() {
fn pending_items_num(rrp: &RegionReadProgress) -> usize {
rrp.core.lock().unwrap().pending_items.len()
}
let cap = 10;
let rrp = RegionReadProgress::new(10, cap);
for i in 1..=20 {
rrp.update_safe_ts(i, i);
}
assert_eq!(rrp.safe_ts(), 10);
assert_eq!(pending_items_num(&rrp), 10);
rrp.update_applied(20);
assert_eq!(rrp.safe_ts(), 20);
assert_eq!(pending_items_num(&rrp), 0);
for i in 100..200 {
rrp.update_safe_ts(i, i);
}
assert_eq!(rrp.safe_ts(), 20);
assert!(pending_items_num(&rrp) <= cap);
rrp.update_applied(200);
assert_eq!(rrp.safe_ts(), 199);
assert_eq!(pending_items_num(&rrp), 0);
rrp.update_safe_ts(300, 300);
assert_eq!(pending_items_num(&rrp), 1);
rrp.update_safe_ts(200, 400);
assert_eq!(pending_items_num(&rrp), 1);
rrp.update_safe_ts(300, 500);
assert_eq!(pending_items_num(&rrp), 1);
rrp.update_safe_ts(301, 600);
assert_eq!(pending_items_num(&rrp), 2);
rrp.update_applied(300);
assert_eq!(rrp.safe_ts(), 500);
rrp.update_applied(301);
assert_eq!(rrp.safe_ts(), 600);
assert_eq!(pending_items_num(&rrp), 0);
rrp.update_safe_ts(300, 500);
rrp.update_safe_ts(301, 600);
rrp.update_safe_ts(400, 0);
rrp.update_safe_ts(0, 700);
assert_eq!(pending_items_num(&rrp), 0);
}
}