use std::{collections::VecDeque, mem};
use protobuf::Message as PbMessage;
use raft_proto::ConfChangeI;
use slog::Logger;
use crate::eraftpb::{ConfState, Entry, EntryType, HardState, Message, MessageType, Snapshot};
use crate::errors::{Error, Result};
use crate::read_only::ReadState;
use crate::{config::Config, StateRole};
use crate::{Raft, SoftState, Status, Storage};
use slog::info;
#[derive(Debug, Default)]
pub struct Peer {
pub id: u64,
pub context: Option<Vec<u8>>,
}
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum SnapshotStatus {
Finish,
Failure,
}
pub fn is_local_msg(t: MessageType) -> bool {
matches!(
t,
MessageType::MsgHup
| MessageType::MsgBeat
| MessageType::MsgUnreachable
| MessageType::MsgSnapStatus
| MessageType::MsgCheckQuorum
)
}
fn is_response_msg(t: MessageType) -> bool {
matches!(
t,
MessageType::MsgAppendResponse
| MessageType::MsgRequestVoteResponse
| MessageType::MsgHeartbeatResponse
| MessageType::MsgUnreachable
| MessageType::MsgRequestPreVoteResponse
)
}
#[deprecated(since = "0.6.0", note = "Please use `Snapshot::is_empty` instead")]
pub fn is_empty_snap(s: &Snapshot) -> bool {
s.is_empty()
}
#[derive(Default, Debug, PartialEq)]
pub struct Ready {
number: u64,
ss: Option<SoftState>,
hs: Option<HardState>,
read_states: Vec<ReadState>,
entries: Vec<Entry>,
snapshot: Snapshot,
is_persisted_msg: bool,
light: LightReady,
must_sync: bool,
}
impl Ready {
#[inline]
pub fn number(&self) -> u64 {
self.number
}
#[inline]
pub fn ss(&self) -> Option<&SoftState> {
self.ss.as_ref()
}
#[inline]
pub fn hs(&self) -> Option<&HardState> {
self.hs.as_ref()
}
#[inline]
pub fn read_states(&self) -> &Vec<ReadState> {
&self.read_states
}
#[inline]
pub fn take_read_states(&mut self) -> Vec<ReadState> {
mem::take(&mut self.read_states)
}
#[inline]
pub fn entries(&self) -> &Vec<Entry> {
&self.entries
}
#[inline]
pub fn take_entries(&mut self) -> Vec<Entry> {
mem::take(&mut self.entries)
}
#[inline]
pub fn snapshot(&self) -> &Snapshot {
&self.snapshot
}
#[inline]
pub fn committed_entries(&self) -> &Vec<Entry> {
self.light.committed_entries()
}
#[inline]
pub fn take_committed_entries(&mut self) -> Vec<Entry> {
self.light.take_committed_entries()
}
#[inline]
pub fn messages(&self) -> &[Message] {
if !self.is_persisted_msg {
self.light.messages()
} else {
&[]
}
}
#[inline]
pub fn take_messages(&mut self) -> Vec<Message> {
if !self.is_persisted_msg {
self.light.take_messages()
} else {
Vec::new()
}
}
#[inline]
pub fn persisted_messages(&self) -> &[Message] {
if self.is_persisted_msg {
self.light.messages()
} else {
&[]
}
}
#[inline]
pub fn take_persisted_messages(&mut self) -> Vec<Message> {
if self.is_persisted_msg {
self.light.take_messages()
} else {
Vec::new()
}
}
#[inline]
pub fn must_sync(&self) -> bool {
self.must_sync
}
}
#[derive(Default, Debug, PartialEq)]
struct ReadyRecord {
number: u64,
last_entry: Option<(u64, u64)>,
snapshot: Option<(u64, u64)>,
}
#[derive(Default, Debug, PartialEq)]
pub struct LightReady {
commit_index: Option<u64>,
committed_entries: Vec<Entry>,
messages: Vec<Message>,
}
impl LightReady {
#[inline]
pub fn commit_index(&self) -> Option<u64> {
self.commit_index
}
#[inline]
pub fn committed_entries(&self) -> &Vec<Entry> {
&self.committed_entries
}
#[inline]
pub fn take_committed_entries(&mut self) -> Vec<Entry> {
mem::take(&mut self.committed_entries)
}
#[inline]
pub fn messages(&self) -> &[Message] {
&self.messages
}
#[inline]
pub fn take_messages(&mut self) -> Vec<Message> {
mem::take(&mut self.messages)
}
}
pub struct RawNode<T: Storage> {
pub raft: Raft<T>,
prev_ss: SoftState,
prev_hs: HardState,
max_number: u64,
records: VecDeque<ReadyRecord>,
commit_since_index: u64,
}
impl<T: Storage> RawNode<T> {
#[allow(clippy::new_ret_no_self)]
pub fn new(config: &Config, store: T, logger: &Logger) -> Result<Self> {
assert_ne!(config.id, 0, "config.id must not be zero");
let r = Raft::new(config, store, logger)?;
let mut rn = RawNode {
raft: r,
prev_hs: Default::default(),
prev_ss: Default::default(),
max_number: 0,
records: VecDeque::new(),
commit_since_index: config.applied,
};
rn.prev_hs = rn.raft.hard_state();
rn.prev_ss = rn.raft.soft_state();
info!(
rn.raft.logger,
"RawNode created with id {id}.",
id = rn.raft.id
);
Ok(rn)
}
#[cfg(feature = "default-logger")]
#[allow(clippy::new_ret_no_self)]
pub fn with_default_logger(c: &Config, store: T) -> Result<Self> {
Self::new(c, store, &crate::default_logger())
}
#[inline]
pub fn set_priority(&mut self, priority: u64) {
self.raft.set_priority(priority);
}
pub fn tick(&mut self) -> bool {
self.raft.tick()
}
pub fn campaign(&mut self) -> Result<()> {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgHup);
self.raft.step(m)
}
pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgPropose);
m.from = self.raft.id;
let mut e = Entry::default();
e.data = data.into();
e.context = context.into();
m.set_entries(vec![e].into());
self.raft.step(m)
}
pub fn ping(&mut self) {
self.raft.ping()
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
pub fn propose_conf_change(&mut self, context: Vec<u8>, cc: impl ConfChangeI) -> Result<()> {
let (data, ty) = if let Some(cc) = cc.as_v1() {
(cc.write_to_bytes()?, EntryType::EntryConfChange)
} else {
(cc.as_v2().write_to_bytes()?, EntryType::EntryConfChangeV2)
};
let mut m = Message::default();
m.set_msg_type(MessageType::MsgPropose);
let mut e = Entry::default();
e.set_entry_type(ty);
e.data = data.into();
e.context = context.into();
m.set_entries(vec![e].into());
self.raft.step(m)
}
pub fn apply_conf_change(&mut self, cc: &impl ConfChangeI) -> Result<ConfState> {
self.raft.apply_conf_change(&cc.as_v2())
}
pub fn step(&mut self, m: Message) -> Result<()> {
if is_local_msg(m.get_msg_type()) {
return Err(Error::StepLocalMsg);
}
if self.raft.prs().get(m.from).is_some() || !is_response_msg(m.get_msg_type()) {
return self.raft.step(m);
}
Err(Error::StepPeerNotFound)
}
fn gen_light_ready(&mut self) -> LightReady {
let mut rd = LightReady::default();
let raft = &mut self.raft;
rd.committed_entries = raft
.raft_log
.next_entries_since(self.commit_since_index)
.unwrap_or_default();
raft.reduce_uncommitted_size(&rd.committed_entries);
if let Some(e) = rd.committed_entries.last() {
assert!(self.commit_since_index < e.get_index());
self.commit_since_index = e.get_index();
}
if !raft.msgs.is_empty() {
rd.messages = mem::take(&mut raft.msgs);
}
rd
}
pub fn ready(&mut self) -> Ready {
let raft = &mut self.raft;
self.max_number += 1;
let mut rd = Ready {
number: self.max_number,
..Default::default()
};
let mut rd_record = ReadyRecord {
number: self.max_number,
..Default::default()
};
if self.prev_ss.raft_state != StateRole::Leader && raft.state == StateRole::Leader {
for record in self.records.drain(..) {
assert_eq!(record.last_entry, None);
assert_eq!(record.snapshot, None);
}
}
let ss = raft.soft_state();
if ss != self.prev_ss {
rd.ss = Some(ss);
}
let hs = raft.hard_state();
if hs != self.prev_hs {
if hs.vote != self.prev_hs.vote || hs.term != self.prev_hs.term {
rd.must_sync = true;
}
rd.hs = Some(hs);
}
if !raft.read_states.is_empty() {
mem::swap(&mut rd.read_states, &mut raft.read_states);
}
if let Some(snapshot) = &raft.raft_log.unstable_snapshot() {
rd.snapshot = snapshot.clone();
assert!(self.commit_since_index <= rd.snapshot.get_metadata().index);
self.commit_since_index = rd.snapshot.get_metadata().index;
assert!(
!raft
.raft_log
.has_next_entries_since(self.commit_since_index),
"has snapshot but also has committed entries since {}",
self.commit_since_index
);
rd_record.snapshot = Some((
rd.snapshot.get_metadata().index,
rd.snapshot.get_metadata().term,
));
rd.must_sync = true;
}
rd.entries = raft.raft_log.unstable_entries().to_vec();
if let Some(e) = rd.entries.last() {
rd.must_sync = true;
rd_record.last_entry = Some((e.get_index(), e.get_term()));
}
rd.is_persisted_msg = raft.state != StateRole::Leader;
rd.light = self.gen_light_ready();
self.records.push_back(rd_record);
rd
}
pub fn has_ready(&self) -> bool {
let raft = &self.raft;
if !raft.msgs.is_empty() {
return true;
}
if raft.soft_state() != self.prev_ss {
return true;
}
if raft.hard_state() != self.prev_hs {
return true;
}
if !raft.read_states.is_empty() {
return true;
}
if !raft.raft_log.unstable_entries().is_empty() {
return true;
}
if self.snap().map_or(false, |s| !s.is_empty()) {
return true;
}
if raft
.raft_log
.has_next_entries_since(self.commit_since_index)
{
return true;
}
false
}
fn commit_ready(&mut self, rd: Ready) {
if let Some(ss) = rd.ss {
self.prev_ss = ss;
}
if let Some(hs) = rd.hs {
self.prev_hs = hs;
}
let rd_record = self.records.back().unwrap();
assert!(rd_record.number == rd.number);
let raft = &mut self.raft;
if let Some((index, _)) = rd_record.snapshot {
raft.raft_log.stable_snap(index);
}
if let Some((index, term)) = rd_record.last_entry {
raft.raft_log.stable_entries(index, term);
}
}
fn commit_apply(&mut self, applied: u64) {
self.raft.commit_apply(applied);
}
pub fn on_persist_ready(&mut self, number: u64) {
let (mut index, mut term) = (0, 0);
let mut snap_index = 0;
while let Some(record) = self.records.front() {
if record.number > number {
break;
}
let record = self.records.pop_front().unwrap();
if let Some((i, _)) = record.snapshot {
snap_index = i;
index = 0;
term = 0;
}
if let Some((i, t)) = record.last_entry {
index = i;
term = t;
}
}
if snap_index != 0 {
self.raft.on_persist_snap(snap_index);
}
if index != 0 {
self.raft.on_persist_entries(index, term);
}
}
pub fn advance(&mut self, rd: Ready) -> LightReady {
let applied = self.commit_since_index;
let light_rd = self.advance_append(rd);
self.advance_apply_to(applied);
light_rd
}
#[inline]
pub fn advance_append(&mut self, rd: Ready) -> LightReady {
self.commit_ready(rd);
self.on_persist_ready(self.max_number);
let mut light_rd = self.gen_light_ready();
if self.raft.state != StateRole::Leader && !light_rd.messages().is_empty() {
fatal!(self.raft.logger, "not leader but has new msg after advance");
}
let hard_state = self.raft.hard_state();
if hard_state.commit > self.prev_hs.commit {
light_rd.commit_index = Some(hard_state.commit);
self.prev_hs.commit = hard_state.commit;
} else {
assert!(hard_state.commit == self.prev_hs.commit);
light_rd.commit_index = None;
}
assert_eq!(hard_state, self.prev_hs, "hard state != prev_hs");
light_rd
}
#[inline]
pub fn advance_append_async(&mut self, rd: Ready) {
self.commit_ready(rd);
}
#[inline]
pub fn advance_apply(&mut self) {
self.commit_apply(self.commit_since_index);
}
#[inline]
pub fn advance_apply_to(&mut self, applied: u64) {
self.commit_apply(applied);
}
#[inline]
pub fn snap(&self) -> Option<&Snapshot> {
self.raft.snap()
}
#[inline]
pub fn status(&self) -> Status {
Status::new(&self.raft)
}
pub fn report_unreachable(&mut self, id: u64) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgUnreachable);
m.from = id;
let _ = self.raft.step(m);
}
pub fn report_snapshot(&mut self, id: u64, status: SnapshotStatus) {
let rej = status == SnapshotStatus::Failure;
let mut m = Message::default();
m.set_msg_type(MessageType::MsgSnapStatus);
m.from = id;
m.reject = rej;
let _ = self.raft.step(m);
}
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
self.raft.request_snapshot(request_index)
}
pub fn transfer_leader(&mut self, transferee: u64) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgTransferLeader);
m.from = transferee;
let _ = self.raft.step(m);
}
pub fn read_index(&mut self, rctx: Vec<u8>) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgReadIndex);
let mut e = Entry::default();
e.data = rctx.into();
m.set_entries(vec![e].into());
let _ = self.raft.step(m);
}
#[inline]
pub fn store(&self) -> &T {
self.raft.store()
}
#[inline]
pub fn mut_store(&mut self) -> &mut T {
self.raft.mut_store()
}
#[inline]
pub fn skip_bcast_commit(&mut self, skip: bool) {
self.raft.skip_bcast_commit(skip)
}
#[inline]
pub fn set_batch_append(&mut self, batch_append: bool) {
self.raft.set_batch_append(batch_append)
}
}
#[cfg(test)]
mod test {
use crate::eraftpb::MessageType;
use super::is_local_msg;
#[test]
fn test_is_local_msg() {
let tests = vec![
(MessageType::MsgHup, true),
(MessageType::MsgBeat, true),
(MessageType::MsgUnreachable, true),
(MessageType::MsgSnapStatus, true),
(MessageType::MsgCheckQuorum, true),
(MessageType::MsgPropose, false),
(MessageType::MsgAppend, false),
(MessageType::MsgAppendResponse, false),
(MessageType::MsgRequestVote, false),
(MessageType::MsgRequestVoteResponse, false),
(MessageType::MsgSnapshot, false),
(MessageType::MsgHeartbeat, false),
(MessageType::MsgHeartbeatResponse, false),
(MessageType::MsgTransferLeader, false),
(MessageType::MsgTimeoutNow, false),
(MessageType::MsgReadIndex, false),
(MessageType::MsgReadIndexResp, false),
(MessageType::MsgRequestPreVote, false),
(MessageType::MsgRequestPreVoteResponse, false),
];
for (msg_type, result) in tests {
assert_eq!(is_local_msg(msg_type), result);
}
}
}