use std::cmp;
use std::ops::{Deref, DerefMut};
use crate::eraftpb::{
ConfChange, ConfChangeV2, ConfState, Entry, EntryType, HardState, Message, MessageType,
Snapshot,
};
use protobuf::Message as _;
use raft_proto::ConfChangeI;
use rand::{self, Rng};
use slog::{self, Logger};
#[cfg(feature = "failpoints")]
use fail::fail_point;
use getset::Getters;
use slog::{debug, error, info, o, trace, warn};
use super::errors::{Error, Result, StorageError};
use super::raft_log::RaftLog;
use super::read_only::{ReadOnly, ReadOnlyOption, ReadState};
use super::storage::Storage;
use super::Config;
use crate::confchange::Changer;
use crate::quorum::VoteResult;
use crate::util;
use crate::util::NO_LIMIT;
use crate::{confchange, Progress, ProgressState, ProgressTracker};
#[doc(hidden)]
pub const CAMPAIGN_PRE_ELECTION: &[u8] = b"CampaignPreElection";
#[doc(hidden)]
#[doc(hidden)]
pub const CAMPAIGN_ELECTION: &[u8] = b"CampaignElection";
#[doc(hidden)]
#[doc(hidden)]
pub const CAMPAIGN_TRANSFER: &[u8] = b"CampaignTransfer";
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum StateRole {
Follower,
Candidate,
Leader,
PreCandidate,
}
impl Default for StateRole {
fn default() -> StateRole {
StateRole::Follower
}
}
pub const INVALID_ID: u64 = 0;
pub const INVALID_INDEX: u64 = 0;
#[derive(Default, PartialEq, Debug)]
pub struct SoftState {
pub leader_id: u64,
pub raft_state: StateRole,
}
struct UncommittedState {
max_uncommitted_size: usize,
uncommitted_size: usize,
last_log_tail_index: u64,
}
impl UncommittedState {
#[inline]
pub fn is_no_limit(&self) -> bool {
self.max_uncommitted_size == NO_LIMIT as usize
}
pub fn maybe_increase_uncommitted_size(&mut self, ents: &[Entry]) -> bool {
if self.is_no_limit() {
return true;
}
let size: usize = ents.iter().map(|ent| ent.get_data().len()).sum();
if size == 0
|| self.uncommitted_size == 0
|| size + self.uncommitted_size <= self.max_uncommitted_size
{
self.uncommitted_size += size;
true
} else {
false
}
}
pub fn maybe_reduce_uncommitted_size(&mut self, ents: &[Entry]) -> bool {
if self.is_no_limit() || ents.is_empty() {
return true;
}
let size: usize = ents
.iter()
.skip_while(|ent| ent.index <= self.last_log_tail_index)
.map(|ent| ent.get_data().len())
.sum();
if size > self.uncommitted_size {
self.uncommitted_size = 0;
false
} else {
self.uncommitted_size -= size;
true
}
}
}
#[derive(Getters)]
pub struct RaftCore<T: Storage> {
pub term: u64,
pub vote: u64,
pub id: u64,
pub read_states: Vec<ReadState>,
pub raft_log: RaftLog<T>,
pub max_inflight: usize,
pub max_msg_size: u64,
pub pending_request_snapshot: u64,
pub state: StateRole,
promotable: bool,
pub leader_id: u64,
pub lead_transferee: Option<u64>,
pub pending_conf_index: u64,
pub read_only: ReadOnly,
pub election_elapsed: usize,
heartbeat_elapsed: usize,
pub check_quorum: bool,
pub pre_vote: bool,
skip_bcast_commit: bool,
batch_append: bool,
heartbeat_timeout: usize,
election_timeout: usize,
randomized_election_timeout: usize,
min_election_timeout: usize,
max_election_timeout: usize,
pub(crate) logger: slog::Logger,
pub priority: u64,
uncommitted_state: UncommittedState,
}
pub struct Raft<T: Storage> {
prs: ProgressTracker,
pub msgs: Vec<Message>,
pub r: RaftCore<T>,
}
impl<T: Storage> Deref for Raft<T> {
type Target = RaftCore<T>;
#[inline]
fn deref(&self) -> &RaftCore<T> {
&self.r
}
}
impl<T: Storage> DerefMut for Raft<T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.r
}
}
trait AssertSend: Send {}
impl<T: Storage + Send> AssertSend for Raft<T> {}
fn new_message(to: u64, field_type: MessageType, from: Option<u64>) -> Message {
let mut m = Message::default();
m.to = to;
if let Some(id) = from {
m.from = id;
}
m.set_msg_type(field_type);
m
}
pub fn vote_resp_msg_type(t: MessageType) -> MessageType {
match t {
MessageType::MsgRequestVote => MessageType::MsgRequestVoteResponse,
MessageType::MsgRequestPreVote => MessageType::MsgRequestPreVoteResponse,
_ => panic!("Not a vote message: {:?}", t),
}
}
impl<T: Storage> Raft<T> {
#[allow(clippy::new_ret_no_self)]
pub fn new(c: &Config, store: T, logger: &Logger) -> Result<Self> {
c.validate()?;
let logger = logger.new(o!("raft_id" => c.id));
let raft_state = store.initial_state()?;
let conf_state = &raft_state.conf_state;
let voters = &conf_state.voters;
let learners = &conf_state.learners;
let mut r = Raft {
prs: ProgressTracker::with_capacity(
voters.len(),
learners.len(),
c.max_inflight_msgs,
logger.clone(),
),
msgs: Default::default(),
r: RaftCore {
id: c.id,
read_states: Default::default(),
raft_log: RaftLog::new(store, logger.clone()),
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
pending_request_snapshot: INVALID_INDEX,
state: StateRole::Follower,
promotable: false,
check_quorum: c.check_quorum,
pre_vote: c.pre_vote,
read_only: ReadOnly::new(c.read_only_option),
heartbeat_timeout: c.heartbeat_tick,
election_timeout: c.election_tick,
leader_id: Default::default(),
lead_transferee: None,
term: Default::default(),
election_elapsed: Default::default(),
pending_conf_index: Default::default(),
vote: Default::default(),
heartbeat_elapsed: Default::default(),
randomized_election_timeout: Default::default(),
min_election_timeout: c.min_election_tick(),
max_election_timeout: c.max_election_tick(),
skip_bcast_commit: c.skip_bcast_commit,
batch_append: c.batch_append,
logger,
priority: c.priority,
uncommitted_state: UncommittedState {
max_uncommitted_size: c.max_uncommitted_size as usize,
uncommitted_size: 0,
last_log_tail_index: 0,
},
},
};
confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?;
let new_cs = r.post_conf_change();
if !raft_proto::conf_state_eq(&new_cs, conf_state) {
fatal!(
r.logger,
"invalid restore: {:?} != {:?}",
conf_state,
new_cs
);
}
if raft_state.hard_state != HardState::default() {
r.load_state(&raft_state.hard_state);
}
if c.applied > 0 {
r.commit_apply(c.applied);
}
r.become_follower(r.term, INVALID_ID);
info!(
r.logger,
"newRaft";
"term" => r.term,
"commit" => r.raft_log.committed,
"applied" => r.raft_log.applied,
"last index" => r.raft_log.last_index(),
"last term" => r.raft_log.last_term(),
"peers" => ?r.prs.conf().voters,
);
Ok(r)
}
pub fn set_priority(&mut self, priority: u64) {
self.priority = priority;
}
#[allow(clippy::new_ret_no_self)]
#[cfg(feature = "default-logger")]
pub fn with_default_logger(c: &Config, store: T) -> Result<Self> {
Self::new(c, store, &crate::default_logger())
}
#[inline]
pub fn store(&self) -> &T {
&self.raft_log.store
}
#[inline]
pub fn mut_store(&mut self) -> &mut T {
&mut self.raft_log.store
}
#[inline]
pub fn snap(&self) -> Option<&Snapshot> {
self.raft_log.unstable.snapshot.as_ref()
}
#[inline]
pub fn pending_read_count(&self) -> usize {
self.read_only.pending_read_count()
}
#[inline]
pub fn ready_read_count(&self) -> usize {
self.read_states.len()
}
pub fn soft_state(&self) -> SoftState {
SoftState {
leader_id: self.leader_id,
raft_state: self.state,
}
}
pub fn hard_state(&self) -> HardState {
let mut hs = HardState::default();
hs.term = self.term;
hs.vote = self.vote;
hs.commit = self.raft_log.committed;
hs
}
pub fn in_lease(&self) -> bool {
self.state == StateRole::Leader && self.check_quorum
}
#[doc(hidden)]
pub fn set_randomized_election_timeout(&mut self, t: usize) {
assert!(self.min_election_timeout <= t && t < self.max_election_timeout);
self.randomized_election_timeout = t;
}
pub fn election_timeout(&self) -> usize {
self.election_timeout
}
pub fn heartbeat_timeout(&self) -> usize {
self.heartbeat_timeout
}
pub fn heartbeat_elapsed(&self) -> usize {
self.heartbeat_elapsed
}
pub fn randomized_election_timeout(&self) -> usize {
self.randomized_election_timeout
}
#[inline]
pub fn skip_bcast_commit(&mut self, skip: bool) {
self.skip_bcast_commit = skip;
}
#[inline]
pub fn set_batch_append(&mut self, batch_append: bool) {
self.batch_append = batch_append;
}
pub fn enable_group_commit(&mut self, enable: bool) {
self.mut_prs().enable_group_commit(enable);
if StateRole::Leader == self.state && !enable && self.maybe_commit() {
self.bcast_append();
}
}
pub fn group_commit(&self) -> bool {
self.prs().group_commit()
}
pub fn assign_commit_groups(&mut self, ids: &[(u64, u64)]) {
let prs = self.mut_prs();
for (peer_id, group_id) in ids {
assert!(*group_id > 0);
if let Some(pr) = prs.get_mut(*peer_id) {
pr.commit_group_id = *group_id;
} else {
continue;
}
}
if StateRole::Leader == self.state && self.group_commit() && self.maybe_commit() {
self.bcast_append();
}
}
pub fn clear_commit_group(&mut self) {
for (_, pr) in self.mut_prs().iter_mut() {
pr.commit_group_id = 0;
}
}
pub fn check_group_commit_consistent(&mut self) -> Option<bool> {
if self.state != StateRole::Leader {
return None;
}
if !self.apply_to_current_term() {
return None;
}
let (index, use_group_commit) = self.mut_prs().maximal_committed_index();
debug!(
self.logger,
"check group commit consistent";
"index" => index,
"use_group_commit" => use_group_commit,
"committed" => self.raft_log.committed
);
Some(use_group_commit && index == self.raft_log.committed)
}
pub fn commit_to_current_term(&self) -> bool {
self.raft_log
.term(self.raft_log.committed)
.map_or(false, |t| t == self.term)
}
pub fn apply_to_current_term(&self) -> bool {
self.raft_log
.term(self.raft_log.applied)
.map_or(false, |t| t == self.term)
}
}
impl<T: Storage> RaftCore<T> {
fn send(&mut self, mut m: Message, msgs: &mut Vec<Message>) {
debug!(
self.logger,
"Sending from {from} to {to}",
from = self.id,
to = m.to;
"msg" => ?m,
);
if m.from == INVALID_ID {
m.from = self.id;
}
if m.get_msg_type() == MessageType::MsgRequestVote
|| m.get_msg_type() == MessageType::MsgRequestPreVote
|| m.get_msg_type() == MessageType::MsgRequestVoteResponse
|| m.get_msg_type() == MessageType::MsgRequestPreVoteResponse
{
if m.term == 0 {
fatal!(
self.logger,
"term should be set when sending {:?}",
m.get_msg_type()
);
}
} else {
if m.term != 0 {
fatal!(
self.logger,
"term should not be set when sending {:?} (was {})",
m.get_msg_type(),
m.term
);
}
if m.get_msg_type() != MessageType::MsgPropose
&& m.get_msg_type() != MessageType::MsgReadIndex
{
m.term = self.term;
}
}
if m.get_msg_type() == MessageType::MsgRequestVote
|| m.get_msg_type() == MessageType::MsgRequestPreVote
{
m.priority = self.priority;
}
msgs.push(m);
}
fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool {
if !pr.recent_active {
debug!(
self.logger,
"ignore sending snapshot to {} since it is not recently active",
to;
);
return false;
}
m.set_msg_type(MessageType::MsgSnapshot);
let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot);
if let Err(e) = snapshot_r {
if e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
debug!(
self.logger,
"failed to send snapshot to {} because snapshot is temporarily \
unavailable",
to;
);
return false;
}
fatal!(self.logger, "unexpected error: {:?}", e);
}
let snapshot = snapshot_r.unwrap();
if snapshot.get_metadata().index == 0 {
fatal!(self.logger, "need non-empty snapshot");
}
let (sindex, sterm) = (snapshot.get_metadata().index, snapshot.get_metadata().term);
m.set_snapshot(snapshot);
debug!(
self.logger,
"[firstindex: {first_index}, commit: {committed}] sent snapshot[index: {snapshot_index}, term: {snapshot_term}] to {to}",
first_index = self.raft_log.first_index(),
committed = self.raft_log.committed,
snapshot_index = sindex,
snapshot_term = sterm,
to = to;
"progress" => ?pr,
);
pr.become_snapshot(sindex);
debug!(
self.logger,
"paused sending replication messages to {}",
to;
"progress" => ?pr,
);
true
}
fn prepare_send_entries(
&mut self,
m: &mut Message,
pr: &mut Progress,
term: u64,
ents: Vec<Entry>,
) {
m.set_msg_type(MessageType::MsgAppend);
m.index = pr.next_idx - 1;
m.log_term = term;
m.set_entries(ents.into());
m.commit = self.raft_log.committed;
if !m.entries.is_empty() {
let last = m.entries.last().unwrap().index;
pr.update_state(last);
}
}
fn try_batching(
&mut self,
to: u64,
msgs: &mut [Message],
pr: &mut Progress,
ents: &mut Vec<Entry>,
) -> bool {
let mut is_batched = false;
for msg in msgs {
if msg.get_msg_type() == MessageType::MsgAppend && msg.to == to {
if !ents.is_empty() {
if !util::is_continuous_ents(msg, ents) {
return is_batched;
}
let mut batched_entries: Vec<_> = msg.take_entries().into();
batched_entries.append(ents);
msg.set_entries(batched_entries.into());
let last_idx = msg.entries.last().unwrap().index;
pr.update_state(last_idx);
}
msg.commit = self.raft_log.committed;
is_batched = true;
break;
}
}
is_batched
}
fn send_append(&mut self, to: u64, pr: &mut Progress, msgs: &mut Vec<Message>) {
self.maybe_send_append(to, pr, true, msgs);
}
fn maybe_send_append(
&mut self,
to: u64,
pr: &mut Progress,
allow_empty: bool,
msgs: &mut Vec<Message>,
) -> bool {
if pr.is_paused() {
trace!(
self.logger,
"Skipping sending to {to}, it's paused",
to = to;
"progress" => ?pr,
);
return false;
}
let mut m = Message::default();
m.to = to;
if pr.pending_request_snapshot != INVALID_INDEX {
if !self.prepare_send_snapshot(&mut m, pr, to) {
return false;
}
} else {
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) {
return false;
}
let term = self.raft_log.term(pr.next_idx - 1);
match (term, ents) {
(Ok(term), Ok(mut ents)) => {
if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
return true;
}
self.prepare_send_entries(&mut m, pr, term, ents)
}
_ => {
if !self.prepare_send_snapshot(&mut m, pr, to) {
return false;
}
}
}
}
self.send(m, msgs);
true
}
fn send_heartbeat(
&mut self,
to: u64,
pr: &Progress,
ctx: Option<Vec<u8>>,
msgs: &mut Vec<Message>,
) {
let mut m = Message::default();
m.to = to;
m.set_msg_type(MessageType::MsgHeartbeat);
let commit = cmp::min(pr.matched, self.raft_log.committed);
m.commit = commit;
if let Some(context) = ctx {
m.context = context.into();
}
self.send(m, msgs);
}
}
impl<T: Storage> Raft<T> {
pub fn send_append(&mut self, to: u64) {
let pr = self.prs.get_mut(to).unwrap();
self.r.send_append(to, pr, &mut self.msgs)
}
pub fn bcast_append(&mut self) {
let self_id = self.id;
let core = &mut self.r;
let msgs = &mut self.msgs;
self.prs
.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| core.send_append(*id, pr, msgs));
}
pub fn ping(&mut self) {
if self.state == StateRole::Leader {
self.bcast_heartbeat();
}
}
pub fn bcast_heartbeat(&mut self) {
let ctx = self.read_only.last_pending_request_ctx();
self.bcast_heartbeat_with_ctx(ctx)
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
fn bcast_heartbeat_with_ctx(&mut self, ctx: Option<Vec<u8>>) {
let self_id = self.id;
let core = &mut self.r;
let msgs = &mut self.msgs;
self.prs
.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| core.send_heartbeat(*id, pr, ctx.clone(), msgs));
}
pub fn maybe_commit(&mut self) -> bool {
let mci = self.mut_prs().maximal_committed_index().0;
if self.r.raft_log.maybe_commit(mci, self.r.term) {
let (self_id, committed) = (self.id, self.raft_log.committed);
self.mut_prs()
.get_mut(self_id)
.unwrap()
.update_committed(committed);
return true;
}
false
}
pub fn commit_apply(&mut self, applied: u64) {
let old_applied = self.raft_log.applied;
#[allow(deprecated)]
self.raft_log.applied_to(applied);
if self.prs.conf().auto_leave
&& old_applied <= self.pending_conf_index
&& applied >= self.pending_conf_index
&& self.state == StateRole::Leader
{
let mut entry = Entry::default();
entry.set_entry_type(EntryType::EntryConfChangeV2);
if !self.append_entry(&mut [entry]) {
panic!("appending an empty EntryConfChangeV2 should never be dropped")
}
self.pending_conf_index = self.raft_log.last_index();
info!(self.logger, "initiating automatic transition out of joint configuration"; "config" => ?self.prs.conf());
}
}
pub fn reset(&mut self, term: u64) {
if self.term != term {
self.term = term;
self.vote = INVALID_ID;
}
self.leader_id = INVALID_ID;
self.reset_randomized_election_timeout();
self.election_elapsed = 0;
self.heartbeat_elapsed = 0;
self.abort_leader_transfer();
self.prs.reset_votes();
self.pending_conf_index = 0;
self.read_only = ReadOnly::new(self.read_only.option);
self.pending_request_snapshot = INVALID_INDEX;
let last_index = self.raft_log.last_index();
let committed = self.raft_log.committed;
let persisted = self.raft_log.persisted;
let self_id = self.id;
for (&id, mut pr) in self.mut_prs().iter_mut() {
pr.reset(last_index + 1);
if id == self_id {
pr.matched = persisted;
pr.committed_index = committed;
}
}
}
#[must_use]
pub fn append_entry(&mut self, es: &mut [Entry]) -> bool {
if !self.maybe_increase_uncommitted_size(es) {
return false;
}
let li = self.raft_log.last_index();
for (i, e) in es.iter_mut().enumerate() {
e.term = self.term;
e.index = li + 1 + i as u64;
}
self.raft_log.append(es);
true
}
pub fn on_persist_entries(&mut self, index: u64, term: u64) {
let update = self.raft_log.maybe_persist(index, term);
if update && self.state == StateRole::Leader {
if term != self.term {
error!(
self.logger,
"leader's persisted index changed but the term {} is not the same as {}",
term,
self.term
);
}
let self_id = self.id;
let pr = self.mut_prs().get_mut(self_id).unwrap();
if pr.maybe_update(index) && self.maybe_commit() && self.should_bcast_commit() {
self.bcast_append();
}
}
}
pub fn on_persist_snap(&mut self, index: u64) {
self.raft_log.maybe_persist_snap(index);
}
pub fn tick(&mut self) -> bool {
match self.state {
StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => {
self.tick_election()
}
StateRole::Leader => self.tick_heartbeat(),
}
}
pub fn tick_election(&mut self) -> bool {
self.election_elapsed += 1;
if !self.pass_election_timeout() || !self.promotable {
return false;
}
self.election_elapsed = 0;
let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id));
let _ = self.step(m);
true
}
fn tick_heartbeat(&mut self) -> bool {
self.heartbeat_elapsed += 1;
self.election_elapsed += 1;
let mut has_ready = false;
if self.election_elapsed >= self.election_timeout {
self.election_elapsed = 0;
if self.check_quorum {
let m = new_message(INVALID_ID, MessageType::MsgCheckQuorum, Some(self.id));
has_ready = true;
let _ = self.step(m);
}
if self.state == StateRole::Leader && self.lead_transferee.is_some() {
self.abort_leader_transfer()
}
}
if self.state != StateRole::Leader {
return has_ready;
}
if self.heartbeat_elapsed >= self.heartbeat_timeout {
self.heartbeat_elapsed = 0;
has_ready = true;
let m = new_message(INVALID_ID, MessageType::MsgBeat, Some(self.id));
let _ = self.step(m);
}
has_ready
}
pub fn become_follower(&mut self, term: u64, leader_id: u64) {
let pending_request_snapshot = self.pending_request_snapshot;
self.reset(term);
self.leader_id = leader_id;
self.state = StateRole::Follower;
self.pending_request_snapshot = pending_request_snapshot;
info!(
self.logger,
"became follower at term {term}",
term = self.term;
);
}
pub fn become_candidate(&mut self) {
assert_ne!(
self.state,
StateRole::Leader,
"invalid transition [leader -> candidate]"
);
let term = self.term + 1;
self.reset(term);
let id = self.id;
self.vote = id;
self.state = StateRole::Candidate;
info!(
self.logger,
"became candidate at term {term}",
term = self.term;
);
}
pub fn become_pre_candidate(&mut self) {
assert_ne!(
self.state,
StateRole::Leader,
"invalid transition [leader -> pre-candidate]"
);
self.state = StateRole::PreCandidate;
self.prs.reset_votes();
self.leader_id = INVALID_ID;
info!(
self.logger,
"became pre-candidate at term {term}",
term = self.term;
);
}
pub fn become_leader(&mut self) {
trace!(self.logger, "ENTER become_leader");
assert_ne!(
self.state,
StateRole::Follower,
"invalid transition [follower -> leader]"
);
let term = self.term;
self.reset(term);
self.leader_id = self.id;
self.state = StateRole::Leader;
let last_index = self.raft_log.last_index();
assert_eq!(last_index, self.raft_log.persisted);
self.uncommitted_state.uncommitted_size = 0;
self.uncommitted_state.last_log_tail_index = last_index;
let id = self.id;
self.mut_prs().get_mut(id).unwrap().become_replicate();
self.pending_conf_index = last_index;
if !self.append_entry(&mut [Entry::default()]) {
panic!("appending an empty entry should never be dropped")
}
info!(
self.logger,
"became leader at term {term}",
term = self.term;
);
trace!(self.logger, "EXIT become_leader");
}
fn num_pending_conf(&self, ents: &[Entry]) -> usize {
ents.iter()
.filter(|e| {
e.get_entry_type() == EntryType::EntryConfChange
|| e.get_entry_type() == EntryType::EntryConfChangeV2
})
.count()
}
#[doc(hidden)]
pub fn campaign(&mut self, campaign_type: &'static [u8]) {
let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
self.become_pre_candidate();
(MessageType::MsgRequestPreVote, self.term + 1)
} else {
self.become_candidate();
(MessageType::MsgRequestVote, self.term)
};
let self_id = self.id;
if VoteResult::Won == self.poll(self_id, vote_msg, true) {
return;
}
let (commit, commit_term) = self.raft_log.commit_info();
let mut voters = [0; 7];
let mut voter_cnt = 0;
for id in self.prs.conf().voters().ids().iter() {
if id == self_id {
continue;
}
if voter_cnt == voters.len() {
self.log_broadcast_vote(vote_msg, &voters);
voter_cnt = 0;
}
voters[voter_cnt] = id;
voter_cnt += 1;
let mut m = new_message(id, vote_msg, None);
m.term = term;
m.index = self.raft_log.last_index();
m.log_term = self.raft_log.last_term();
m.commit = commit;
m.commit_term = commit_term;
if campaign_type == CAMPAIGN_TRANSFER {
m.context = campaign_type.into();
}
self.r.send(m, &mut self.msgs);
}
if voter_cnt > 0 {
self.log_broadcast_vote(vote_msg, &voters[..voter_cnt]);
}
}
#[inline]
fn log_broadcast_vote(&self, t: MessageType, ids: &[u64]) {
info!(
self.logger,
"broadcasting vote request";
"type" => ?t,
"term" => self.term,
"log_term" => self.raft_log.last_term(),
"log_index" => self.raft_log.last_index(),
"to" => ?ids,
);
}
pub fn step(&mut self, m: Message) -> Result<()> {
if m.term == 0 {
} else if m.term > self.term {
if m.get_msg_type() == MessageType::MsgRequestVote
|| m.get_msg_type() == MessageType::MsgRequestPreVote
{
let force = m.context == CAMPAIGN_TRANSFER;
let in_lease = self.check_quorum
&& self.leader_id != INVALID_ID
&& self.election_elapsed < self.election_timeout;
if !force && in_lease {
info!(
self.logger,
"[logterm: {log_term}, index: {log_index}, vote: {vote}] ignored vote from \
{from} [logterm: {msg_term}, index: {msg_index}]: lease is not expired",
log_term = self.raft_log.last_term(),
log_index = self.raft_log.last_index(),
vote = self.vote,
from = m.from,
msg_term = m.log_term,
msg_index = m.index;
"term" => self.term,
"remaining ticks" => self.election_timeout - self.election_elapsed,
"msg type" => ?m.get_msg_type(),
);
return Ok(());
}
}
if m.get_msg_type() == MessageType::MsgRequestPreVote
|| (m.get_msg_type() == MessageType::MsgRequestPreVoteResponse && !m.reject)
{
} else {
info!(
self.logger,
"received a message with higher term from {from}",
from = m.from;
"term" => self.term,
"message_term" => m.term,
"msg type" => ?m.get_msg_type(),
);
if m.get_msg_type() == MessageType::MsgAppend
|| m.get_msg_type() == MessageType::MsgHeartbeat
|| m.get_msg_type() == MessageType::MsgSnapshot
{
self.become_follower(m.term, m.from);
} else {
self.become_follower(m.term, INVALID_ID);
}
}
} else if m.term < self.term {
if (self.check_quorum || self.pre_vote)
&& (m.get_msg_type() == MessageType::MsgHeartbeat
|| m.get_msg_type() == MessageType::MsgAppend)
{
let to_send = new_message(m.from, MessageType::MsgAppendResponse, None);
self.r.send(to_send, &mut self.msgs);
} else if m.get_msg_type() == MessageType::MsgRequestPreVote {
info!(
self.logger,
"{} [log_term: {}, index: {}, vote: {}] rejected {:?} from {} [log_term: {}, index: {}] at term {}",
self.id,
self.raft_log.last_term(),
self.raft_log.last_index(),
self.vote,
m.get_msg_type(),
m.from,
m.log_term,
m.index,
self.term,
);
let mut to_send = new_message(m.from, MessageType::MsgRequestPreVoteResponse, None);
to_send.term = self.term;
to_send.reject = true;
self.r.send(to_send, &mut self.msgs);
} else {
info!(
self.logger,
"ignored a message with lower term from {from}",
from = m.from;
"term" => self.term,
"msg type" => ?m.get_msg_type(),
"msg term" => m.term
);
}
return Ok(());
}
#[cfg(feature = "failpoints")]
fail_point!("before_step");
match m.get_msg_type() {
MessageType::MsgHup => self.hup(false),
MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => {
let can_vote = (self.vote == m.from) ||
(self.vote == INVALID_ID && self.leader_id == INVALID_ID) ||
(m.get_msg_type() == MessageType::MsgRequestPreVote && m.term > self.term);
if can_vote
&& self.raft_log.is_up_to_date(m.index, m.log_term)
&& (m.index > self.raft_log.last_index() || self.priority <= m.priority)
{
self.log_vote_approve(&m);
let mut to_send =
new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
to_send.reject = false;
to_send.term = m.term;
self.r.send(to_send, &mut self.msgs);
if m.get_msg_type() == MessageType::MsgRequestVote {
self.election_elapsed = 0;
self.vote = m.from;
}
} else {
self.log_vote_reject(&m);
let mut to_send =
new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
to_send.reject = true;
to_send.term = self.term;
let (commit, commit_term) = self.raft_log.commit_info();
to_send.commit = commit;
to_send.commit_term = commit_term;
self.r.send(to_send, &mut self.msgs);
self.maybe_commit_by_vote(&m);
}
}
_ => match self.state {
StateRole::PreCandidate | StateRole::Candidate => self.step_candidate(m)?,
StateRole::Follower => self.step_follower(m)?,
StateRole::Leader => self.step_leader(m)?,
},
}
Ok(())
}
fn hup(&mut self, transfer_leader: bool) {
if self.state == StateRole::Leader {
debug!(
self.logger,
"ignoring MsgHup because already leader";
);
return;
}
let first_index = match self.raft_log.unstable.maybe_first_index() {
Some(idx) => idx,
None => self.raft_log.applied + 1,
};
let ents = self
.raft_log
.slice(first_index, self.raft_log.committed + 1, None)
.unwrap_or_else(|e| {
fatal!(
self.logger,
"unexpected error getting unapplied entries [{}, {}): {:?}",
first_index,
self.raft_log.committed + 1,
e
);
});
let n = self.num_pending_conf(&ents);
if n != 0 {
warn!(
self.logger,
"cannot campaign at term {term} since there are still {pending_changes} pending \
configuration changes to apply",
term = self.term,
pending_changes = n;
);
return;
}
info!(
self.logger,
"starting a new election";
"term" => self.term,
);
if transfer_leader {
self.campaign(CAMPAIGN_TRANSFER);
} else if self.pre_vote {
self.campaign(CAMPAIGN_PRE_ELECTION);
} else {
self.campaign(CAMPAIGN_ELECTION);
}
}
fn log_vote_approve(&self, m: &Message) {
info!(
self.logger,
"[logterm: {log_term}, index: {log_index}, vote: {vote}] cast vote for {from} [logterm: {msg_term}, index: {msg_index}] \
at term {term}",
log_term = self.raft_log.last_term(),
log_index = self.raft_log.last_index(),
vote = self.vote,
from = m.from,
msg_term = m.log_term,
msg_index = m.index,
term = self.term;
"msg type" => ?m.get_msg_type(),
);
}
fn log_vote_reject(&self, m: &Message) {
info!(
self.logger,
"[logterm: {log_term}, index: {log_index}, vote: {vote}] rejected vote from {from} [logterm: {msg_term}, index: \
{msg_index}] at term {term}",
log_term = self.raft_log.last_term(),
log_index = self.raft_log.last_index(),
vote = self.vote,
from = m.from,
msg_term = m.log_term,
msg_index = m.index,
term = self.term;
"msg type" => ?m.get_msg_type(),
);
}
fn handle_append_response(&mut self, m: &Message) {
let mut next_probe_index: u64 = m.reject_hint;
if m.reject && m.log_term > 0 {
next_probe_index = self
.raft_log
.find_conflict_by_term(m.reject_hint, m.log_term)
.0;
}
let pr = match self.prs.get_mut(m.from) {
Some(pr) => pr,
None => {
debug!(
self.logger,
"no progress available for {}",
m.from;
);
return;
}
};
pr.recent_active = true;
pr.update_committed(m.commit);
if m.reject {
debug!(
self.r.logger,
"received msgAppend rejection";
"reject_hint_index" => m.reject_hint,
"reject_hint_term" => m.log_term,
"from" => m.from,
"index" => m.index,
);
if pr.maybe_decr_to(m.index, next_probe_index, m.request_snapshot) {
debug!(
self.r.logger,
"decreased progress of {}",
m.from;
"progress" => ?pr,
);
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
self.send_append(m.from);
}
return;
}
let old_paused = pr.is_paused();
if !pr.maybe_update(m.index) {
return;
}
match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if pr.maybe_snapshot_abort() {
debug!(
self.r.logger,
"snapshot aborted, resumed sending replication messages to {from}",
from = m.from;
"progress" => ?pr,
);
pr.become_probe();
}
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
if self.maybe_commit() {
if self.should_bcast_commit() {
self.bcast_append()
}
} else if old_paused {
self.send_append(m.from)
}
let pr = self.prs.get_mut(m.from).unwrap();
while self.r.maybe_send_append(m.from, pr, false, &mut self.msgs) {}
if Some(m.from) == self.r.lead_transferee {
let last_index = self.r.raft_log.last_index();
if pr.matched == last_index {
info!(
self.logger,
"sent MsgTimeoutNow to {from} after received MsgAppResp",
from = m.from;
);
self.send_timeout_now(m.from);
}
}
}
fn handle_heartbeat_response(&mut self, m: &Message) {
let pr = match self.prs.get_mut(m.from) {
Some(pr) => pr,
None => {
debug!(
self.logger,
"no progress available for {}",
m.from;
);
return;
}
};
pr.update_committed(m.commit);
pr.recent_active = true;
pr.resume();
if pr.state == ProgressState::Replicate && pr.ins.full() {
pr.ins.free_first_one();
}
if pr.matched < self.r.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX
{
self.r.send_append(m.from, pr, &mut self.msgs);
}
if self.read_only.option != ReadOnlyOption::Safe || m.context.is_empty() {
return;
}
match self.r.read_only.recv_ack(m.from, &m.context) {
Some(acks) if self.prs.has_quorum(acks) => {}
_ => return,
}
for rs in self.r.read_only.advance(&m.context, &self.r.logger) {
if let Some(m) = self.handle_ready_read_index(rs.req, rs.index) {
self.r.send(m, &mut self.msgs);
}
}
}
fn handle_transfer_leader(&mut self, m: &Message) {
if self.prs().get(m.from).is_none() {
debug!(
self.logger,
"no progress available for {}",
m.from;
);
return;
}
let from = m.from;
if self.prs.conf().learners.contains(&from) {
debug!(
self.logger,
"ignored transferring leadership";
"to" => from,
);
return;
}
let lead_transferee = from;
if let Some(last_lead_transferee) = self.lead_transferee {
if last_lead_transferee == lead_transferee {
info!(
self.logger,
"[term {term}] transfer leadership to {lead_transferee} is in progress, ignores request \
to same node {lead_transferee}",
term = self.term,
lead_transferee = lead_transferee;
);
return;
}
self.abort_leader_transfer();
info!(
self.logger,
"[term {term}] abort previous transferring leadership to {last_lead_transferee}",
term = self.term,
last_lead_transferee = last_lead_transferee;
);
}
if lead_transferee == self.id {
debug!(
self.logger,
"already leader; ignored transferring leadership to self";
);
return;
}
info!(
self.logger,
"[term {term}] starts to transfer leadership to {lead_transferee}",
term = self.term,
lead_transferee = lead_transferee;
);
self.election_elapsed = 0;
self.lead_transferee = Some(lead_transferee);
let pr = self.prs.get_mut(from).unwrap();
if pr.matched == self.r.raft_log.last_index() {
self.send_timeout_now(lead_transferee);
info!(
self.logger,
"sends MsgTimeoutNow to {lead_transferee} immediately as {lead_transferee} already has up-to-date log",
lead_transferee = lead_transferee;
);
} else {
self.r.send_append(lead_transferee, pr, &mut self.msgs);
}
}
fn handle_snapshot_status(&mut self, m: &Message) {
let pr = match self.prs.get_mut(m.from) {
Some(pr) => pr,
None => {
debug!(
self.logger,
"no progress available for {}",
m.from;
);
return;
}
};
if pr.state != ProgressState::Snapshot {
return;
}
if m.reject {
pr.snapshot_failure();
pr.become_probe();
debug!(
self.r.logger,
"snapshot failed, resumed sending replication messages to {from}",
from = m.from;
"progress" => ?pr,
);
} else {
pr.become_probe();
debug!(
self.r.logger,
"snapshot succeeded, resumed sending replication messages to {from}",
from = m.from;
"progress" => ?pr,
);
}
pr.pause();
pr.pending_request_snapshot = INVALID_INDEX;
}
fn handle_unreachable(&mut self, m: &Message) {
let pr = match self.prs.get_mut(m.from) {
Some(pr) => pr,
None => {
debug!(
self.logger,
"no progress available for {}",
m.from;
);
return;
}
};
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
debug!(
self.r.logger,
"failed to send message to {from} because it is unreachable",
from = m.from;
"progress" => ?pr,
);
}
fn step_leader(&mut self, mut m: Message) -> Result<()> {
match m.get_msg_type() {
MessageType::MsgBeat => {
self.bcast_heartbeat();
return Ok(());
}
MessageType::MsgCheckQuorum => {
if !self.check_quorum_active() {
warn!(
self.logger,
"stepped down to follower since quorum is not active";
);
let term = self.term;
self.become_follower(term, INVALID_ID);
}
return Ok(());
}
MessageType::MsgPropose => {
if m.entries.is_empty() {
fatal!(self.logger, "stepped empty MsgProp");
}
if !self.prs.progress().contains_key(&self.id) {
return Err(Error::ProposalDropped);
}
if self.lead_transferee.is_some() {
debug!(
self.logger,
"[term {term}] transfer leadership to {lead_transferee} is in progress; dropping \
proposal",
term = self.term,
lead_transferee = self.lead_transferee.unwrap();
);
return Err(Error::ProposalDropped);
}
for (i, e) in m.mut_entries().iter_mut().enumerate() {
let mut cc;
if e.get_entry_type() == EntryType::EntryConfChange {
let mut cc_v1 = ConfChange::default();
if let Err(e) = cc_v1.merge_from_bytes(e.get_data()) {
error!(self.logger, "invalid confchange"; "error" => ?e);
return Err(Error::ProposalDropped);
}
cc = cc_v1.into_v2();
} else if e.get_entry_type() == EntryType::EntryConfChangeV2 {
cc = ConfChangeV2::default();
if let Err(e) = cc.merge_from_bytes(e.get_data()) {
error!(self.logger, "invalid confchangev2"; "error" => ?e);
return Err(Error::ProposalDropped);
}
} else {
continue;
}
let reason = if self.has_pending_conf() {
"possible unapplied conf change"
} else {
let already_joint = confchange::joint(self.prs.conf());
let want_leave = cc.changes.is_empty();
if already_joint && !want_leave {
"must transition out of joint config first"
} else if !already_joint && want_leave {
"not in joint state; refusing empty conf change"
} else {
""
}
};
if reason.is_empty() {
self.pending_conf_index = self.raft_log.last_index() + i as u64 + 1;
} else {
info!(
self.logger,
"ignoring conf change";
"conf change" => ?cc,
"reason" => reason,
"config" => ?self.prs.conf(),
"index" => self.pending_conf_index,
"applied" => self.raft_log.applied,
);
*e = Entry::default();
e.set_entry_type(EntryType::EntryNormal);
}
}
if !self.append_entry(&mut m.mut_entries()) {
debug!(
self.logger,
"entries are dropped due to overlimit of max uncommitted size, uncommitted_size: {}",
self.uncommitted_size()
);
return Err(Error::ProposalDropped);
}
self.bcast_append();
return Ok(());
}
MessageType::MsgReadIndex => {
if !self.commit_to_current_term() {
return Ok(());
}
if self.prs().is_singleton() {
let read_index = self.raft_log.committed;
if let Some(m) = self.handle_ready_read_index(m, read_index) {
self.r.send(m, &mut self.msgs);
}
return Ok(());
}
match self.read_only.option {
ReadOnlyOption::Safe => {
let ctx = m.entries[0].data.to_vec();
self.r
.read_only
.add_request(self.r.raft_log.committed, m, self.r.id);
self.bcast_heartbeat_with_ctx(Some(ctx));
}
ReadOnlyOption::LeaseBased => {
let read_index = self.raft_log.committed;
if let Some(m) = self.handle_ready_read_index(m, read_index) {
self.r.send(m, &mut self.msgs);
}
}
}
return Ok(());
}
_ => {}
}
match m.get_msg_type() {
MessageType::MsgAppendResponse => {
self.handle_append_response(&m);
}
MessageType::MsgHeartbeatResponse => {
self.handle_heartbeat_response(&m);
}
MessageType::MsgSnapStatus => {
self.handle_snapshot_status(&m);
}
MessageType::MsgUnreachable => {
self.handle_unreachable(&m);
}
MessageType::MsgTransferLeader => {
self.handle_transfer_leader(&m);
}
_ => {
if self.prs().get(m.from).is_none() {
debug!(
self.logger,
"no progress available for {}",
m.from;
);
}
}
}
Ok(())
}
fn maybe_commit_by_vote(&mut self, m: &Message) {
if m.commit == 0 || m.commit_term == 0 {
return;
}
let last_commit = self.raft_log.committed;
if m.commit <= last_commit || self.state == StateRole::Leader {
return;
}
if !self.raft_log.maybe_commit(m.commit, m.commit_term) {
return;
}
let log = &mut self.r.raft_log;
info!(self.r.logger, "[commit: {}, lastindex: {}, lastterm: {}] fast-forwarded commit to vote request [index: {}, term: {}]",
log.committed, log.last_index(), log.last_term(), m.commit, m.commit_term);
if self.state != StateRole::Candidate && self.state != StateRole::PreCandidate {
return;
}
let ents = self
.raft_log
.slice(last_commit + 1, self.raft_log.committed + 1, None)
.unwrap_or_else(|e| {
fatal!(
self.logger,
"unexpected error getting unapplied entries [{}, {}): {:?}",
last_commit + 1,
self.raft_log.committed + 1,
e
);
});
if self.num_pending_conf(&ents) != 0 {
let term = self.term;
self.become_follower(term, INVALID_ID);
}
}
fn poll(&mut self, from: u64, t: MessageType, vote: bool) -> VoteResult {
self.prs.record_vote(from, vote);
let (gr, rj, res) = self.prs.tally_votes();
if from != self.id {
info!(
self.logger,
"received votes response";
"vote" => vote,
"from" => from,
"rejections" => rj,
"approvals" => gr,
"type" => ?t,
"term" => self.term,
);
}
match res {
VoteResult::Won => {
if self.state == StateRole::PreCandidate {
self.campaign(CAMPAIGN_ELECTION);
} else {
self.become_leader();
self.bcast_append();
}
}
VoteResult::Lost => {
let term = self.term;
self.become_follower(term, INVALID_ID);
}
VoteResult::Pending => (),
}
res
}
fn step_candidate(&mut self, m: Message) -> Result<()> {
match m.get_msg_type() {
MessageType::MsgPropose => {
info!(
self.logger,
"no leader at term {term}; dropping proposal",
term = self.term;
);
return Err(Error::ProposalDropped);
}
MessageType::MsgAppend => {
debug_assert_eq!(self.term, m.term);
self.become_follower(m.term, m.from);
self.handle_append_entries(&m);
}
MessageType::MsgHeartbeat => {
debug_assert_eq!(self.term, m.term);
self.become_follower(m.term, m.from);
self.handle_heartbeat(m);
}
MessageType::MsgSnapshot => {
debug_assert_eq!(self.term, m.term);
self.become_follower(m.term, m.from);
self.handle_snapshot(m);
}
MessageType::MsgRequestPreVoteResponse | MessageType::MsgRequestVoteResponse => {
if (self.state == StateRole::PreCandidate
&& m.get_msg_type() != MessageType::MsgRequestPreVoteResponse)
|| (self.state == StateRole::Candidate
&& m.get_msg_type() != MessageType::MsgRequestVoteResponse)
{
return Ok(());
}
self.poll(m.from, m.get_msg_type(), !m.reject);
self.maybe_commit_by_vote(&m);
}
MessageType::MsgTimeoutNow => debug!(
self.logger,
"{term} ignored MsgTimeoutNow from {from}",
term = self.term,
from = m.from;
"state" => ?self.state,
),
_ => {}
}
Ok(())
}
fn step_follower(&mut self, mut m: Message) -> Result<()> {
match m.get_msg_type() {
MessageType::MsgPropose => {
if self.leader_id == INVALID_ID {
info!(
self.logger,
"no leader at term {term}; dropping proposal",
term = self.term;
);
return Err(Error::ProposalDropped);
}
m.to = self.leader_id;
self.r.send(m, &mut self.msgs);
}
MessageType::MsgAppend => {
self.election_elapsed = 0;
self.leader_id = m.from;
self.handle_append_entries(&m);
}
MessageType::MsgHeartbeat => {
self.election_elapsed = 0;
self.leader_id = m.from;
self.handle_heartbeat(m);
}
MessageType::MsgSnapshot => {
self.election_elapsed = 0;
self.leader_id = m.from;
self.handle_snapshot(m);
}
MessageType::MsgTransferLeader => {
if self.leader_id == INVALID_ID {
info!(
self.logger,
"no leader at term {term}; dropping leader transfer msg",
term = self.term;
);
return Ok(());
}
m.to = self.leader_id;
self.r.send(m, &mut self.msgs);
}
MessageType::MsgTimeoutNow => {
if self.promotable {
info!(
self.logger,
"[term {term}] received MsgTimeoutNow from {from} and starts an election to \
get leadership.",
term = self.term,
from = m.from;
);
self.hup(true);
} else {
info!(
self.logger,
"received MsgTimeoutNow from {} but is not promotable",
m.from;
);
}
}
MessageType::MsgReadIndex => {
if self.leader_id == INVALID_ID {
info!(
self.logger,
"no leader at term {term}; dropping index reading msg",
term = self.term;
);
return Ok(());
}
m.to = self.leader_id;
self.r.send(m, &mut self.msgs);
}
MessageType::MsgReadIndexResp => {
if m.entries.len() != 1 {
error!(
self.logger,
"invalid format of MsgReadIndexResp from {}",
m.from;
"entries count" => m.entries.len(),
);
return Ok(());
}
let rs = ReadState {
index: m.index,
request_ctx: m.take_entries()[0].take_data().to_vec(),
};
self.read_states.push(rs);
self.raft_log.maybe_commit(m.index, m.term);
}
_ => {}
}
Ok(())
}
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
if self.state == StateRole::Leader {
info!(
self.logger,
"can not request snapshot on leader; dropping request snapshot";
);
} else if self.leader_id == INVALID_ID {
info!(
self.logger,
"drop request snapshot because of no leader";
"term" => self.term,
);
} else if self.snap().is_some() {
info!(
self.logger,
"there is a pending snapshot; dropping request snapshot";
);
} else if self.pending_request_snapshot != INVALID_INDEX {
info!(
self.logger,
"there is a pending snapshot; dropping request snapshot";
);
} else {
self.pending_request_snapshot = request_index;
self.send_request_snapshot();
return Ok(());
}
Err(Error::RequestSnapshotDropped)
}
pub fn handle_append_entries(&mut self, m: &Message) {
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
if m.index < self.raft_log.committed {
debug!(
self.logger,
"got message with lower index than committed.";
);
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.to = m.from;
to_send.index = self.raft_log.committed;
to_send.commit = self.raft_log.committed;
self.r.send(to_send, &mut self.msgs);
return;
}
let mut to_send = Message::default();
to_send.to = m.from;
to_send.set_msg_type(MessageType::MsgAppendResponse);
if let Some((_, last_idx)) = self
.raft_log
.maybe_append(m.index, m.log_term, m.commit, &m.entries)
{
to_send.set_index(last_idx);
} else {
debug!(
self.logger,
"rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \
from {from}",
msg_log_term = m.log_term,
msg_index = m.index,
from = m.from;
"index" => m.index,
"logterm" => ?self.raft_log.term(m.index),
);
let hint_index = cmp::min(m.index, self.raft_log.last_index());
let (hint_index, hint_term) =
self.raft_log.find_conflict_by_term(hint_index, m.log_term);
if hint_term.is_none() {
fatal!(
self.logger,
"term({index}) must be valid",
index = hint_index
)
}
to_send.index = m.index;
to_send.reject = true;
to_send.reject_hint = hint_index;
to_send.log_term = hint_term.unwrap();
}
to_send.set_commit(self.raft_log.committed);
self.r.send(to_send, &mut self.msgs);
}
pub fn handle_heartbeat(&mut self, mut m: Message) {
self.raft_log.commit_to(m.commit);
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgHeartbeatResponse);
to_send.to = m.from;
to_send.context = m.take_context();
to_send.commit = self.raft_log.committed;
self.r.send(to_send, &mut self.msgs);
}
fn handle_snapshot(&mut self, mut m: Message) {
let metadata = m.get_snapshot().get_metadata();
let (sindex, sterm) = (metadata.index, metadata.term);
if self.restore(m.take_snapshot()) {
info!(
self.logger,
"[commit: {commit}, term: {term}] restored snapshot [index: {snapshot_index}, term: {snapshot_term}]",
term = self.term,
commit = self.raft_log.committed,
snapshot_index = sindex,
snapshot_term = sterm;
);
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.to = m.from;
to_send.index = self.raft_log.last_index();
self.r.send(to_send, &mut self.msgs);
} else {
info!(
self.logger,
"[commit: {commit}] ignored snapshot [index: {snapshot_index}, term: {snapshot_term}]",
commit = self.raft_log.committed,
snapshot_index = sindex,
snapshot_term = sterm;
);
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.to = m.from;
to_send.index = self.raft_log.committed;
self.r.send(to_send, &mut self.msgs);
}
}
pub fn restore(&mut self, snap: Snapshot) -> bool {
if snap.get_metadata().index < self.raft_log.committed {
return false;
}
if self.state != StateRole::Follower {
warn!(self.logger, "non-follower attempted to restore snapshot"; "state" => ?self.state);
self.become_follower(self.term + 1, INVALID_INDEX);
return false;
}
let meta = snap.get_metadata();
let (snap_index, snap_term) = (meta.index, meta.term);
let cs = meta.get_conf_state();
if cs
.get_voters()
.iter()
.chain(cs.get_learners())
.chain(cs.get_voters_outgoing())
.all(|id| *id != self.id)
{
warn!(self.logger, "attempted to restore snapshot but it is not in the ConfState"; "conf_state" => ?cs);
return false;
}
if self.pending_request_snapshot == INVALID_INDEX
&& self.raft_log.match_term(meta.index, meta.term)
{
info!(
self.logger,
"fast-forwarded commit to snapshot";
"commit" => self.raft_log.committed,
"last_index" => self.raft_log.last_index(),
"last_term" => self.raft_log.last_term(),
"snapshot_index" => snap_index,
"snapshot_term" => snap_term
);
self.raft_log.commit_to(meta.index);
return false;
}
self.raft_log.restore(snap);
let cs = self
.r
.raft_log
.pending_snapshot()
.unwrap()
.get_metadata()
.get_conf_state();
self.prs.clear();
let last_index = self.raft_log.last_index();
if let Err(e) = confchange::restore(&mut self.prs, last_index, cs) {
fatal!(self.logger, "unable to restore config {:?}: {}", cs, e);
}
let new_cs = self.post_conf_change();
let cs = self
.r
.raft_log
.pending_snapshot()
.unwrap()
.get_metadata()
.get_conf_state();
if !raft_proto::conf_state_eq(cs, &new_cs) {
fatal!(self.logger, "invalid restore: {:?} != {:?}", cs, new_cs);
}
let pr = self.prs.get_mut(self.id).unwrap();
pr.maybe_update(pr.next_idx - 1);
self.pending_request_snapshot = INVALID_INDEX;
info!(
self.logger,
"restored snapshot";
"commit" => self.raft_log.committed,
"last_index" => self.raft_log.last_index(),
"last_term" => self.raft_log.last_term(),
"snapshot_index" => snap_index,
"snapshot_term" => snap_term,
);
true
}
pub fn post_conf_change(&mut self) -> ConfState {
info!(self.logger, "switched to configuration"; "config" => ?self.prs.conf());
let cs = self.prs.conf().to_conf_state();
let is_voter = self.prs.conf().voters.contains(self.id);
self.promotable = is_voter;
if !is_voter && self.state == StateRole::Leader {
return cs;
}
if self.state != StateRole::Leader || cs.voters.is_empty() {
return cs;
}
if self.maybe_commit() {
self.bcast_append();
} else {
let self_id = self.id;
let core = &mut self.r;
let msgs = &mut self.msgs;
self.prs
.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| {
core.maybe_send_append(*id, pr, false, msgs);
});
}
if let Some(ctx) = self.read_only.last_pending_request_ctx() {
let prs = &self.prs;
if self
.r
.read_only
.recv_ack(self.id, &ctx)
.map_or(false, |acks| prs.has_quorum(acks))
{
for rs in self.r.read_only.advance(&ctx, &self.r.logger) {
if let Some(m) = self.handle_ready_read_index(rs.req, rs.index) {
self.r.send(m, &mut self.msgs);
}
}
}
}
if self
.lead_transferee
.map_or(false, |e| !self.prs.conf().voters.contains(e))
{
self.abort_leader_transfer();
}
cs
}
#[inline]
pub fn has_pending_conf(&self) -> bool {
self.pending_conf_index > self.raft_log.applied
}
pub fn should_bcast_commit(&self) -> bool {
!self.skip_bcast_commit || self.has_pending_conf()
}
pub fn promotable(&self) -> bool {
self.promotable
}
#[doc(hidden)]
pub fn apply_conf_change(&mut self, cc: &ConfChangeV2) -> Result<ConfState> {
let mut changer = Changer::new(&self.prs);
let (cfg, changes) = if cc.leave_joint() {
changer.leave_joint()?
} else if let Some(auto_leave) = cc.enter_joint() {
changer.enter_joint(auto_leave, &cc.changes)?
} else {
changer.simple(&cc.changes)?
};
self.prs
.apply_conf(cfg, changes, self.raft_log.last_index());
Ok(self.post_conf_change())
}
pub fn prs(&self) -> &ProgressTracker {
&self.prs
}
pub fn mut_prs(&mut self) -> &mut ProgressTracker {
&mut self.prs
}
pub fn load_state(&mut self, hs: &HardState) {
if hs.commit < self.raft_log.committed || hs.commit > self.raft_log.last_index() {
fatal!(
self.logger,
"hs.commit {} is out of range [{}, {}]",
hs.commit,
self.raft_log.committed,
self.raft_log.last_index()
)
}
self.raft_log.committed = hs.commit;
self.term = hs.term;
self.vote = hs.vote;
}
pub fn pass_election_timeout(&self) -> bool {
self.election_elapsed >= self.randomized_election_timeout
}
pub fn reset_randomized_election_timeout(&mut self) {
let prev_timeout = self.randomized_election_timeout;
let timeout =
rand::thread_rng().gen_range(self.min_election_timeout..self.max_election_timeout);
debug!(
self.logger,
"reset election timeout {prev_timeout} -> {timeout} at {election_elapsed}",
prev_timeout = prev_timeout,
timeout = timeout,
election_elapsed = self.election_elapsed;
);
self.randomized_election_timeout = timeout;
}
fn check_quorum_active(&mut self) -> bool {
let self_id = self.id;
self.mut_prs().quorum_recently_active(self_id)
}
pub fn send_timeout_now(&mut self, to: u64) {
let msg = new_message(to, MessageType::MsgTimeoutNow, None);
self.r.send(msg, &mut self.msgs);
}
pub fn abort_leader_transfer(&mut self) {
self.lead_transferee = None;
}
fn send_request_snapshot(&mut self) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgAppendResponse);
m.index = self.raft_log.committed;
m.reject = true;
m.reject_hint = self.raft_log.last_index();
m.to = self.leader_id;
m.request_snapshot = self.pending_request_snapshot;
self.r.send(m, &mut self.msgs);
}
fn handle_ready_read_index(&mut self, mut req: Message, index: u64) -> Option<Message> {
if req.from == INVALID_ID || req.from == self.id {
let rs = ReadState {
index,
request_ctx: req.take_entries()[0].take_data().to_vec(),
};
self.read_states.push(rs);
return None;
}
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.to = req.from;
to_send.index = index;
to_send.set_entries(req.take_entries());
Some(to_send)
}
pub fn reduce_uncommitted_size(&mut self, ents: &[Entry]) {
if self.state != StateRole::Leader {
return;
}
if !self.uncommitted_state.maybe_reduce_uncommitted_size(ents) {
warn!(
self.r.logger,
"try to reduce uncommitted size less than 0, first index of pending ents is {}",
ents[0].get_index()
);
}
}
#[inline]
pub fn maybe_increase_uncommitted_size(&mut self, ents: &[Entry]) -> bool {
self.uncommitted_state.maybe_increase_uncommitted_size(ents)
}
#[inline]
pub fn uncommitted_size(&self) -> usize {
self.uncommitted_state.uncommitted_size
}
}