use crate::{Inflights, ProgressState, INVALID_INDEX};
use std::cmp;
#[derive(Debug, Clone, PartialEq)]
pub struct Progress {
pub matched: u64,
pub next_idx: u64,
pub state: ProgressState,
pub paused: bool,
pub pending_snapshot: u64,
pub pending_request_snapshot: u64,
pub recent_active: bool,
pub ins: Inflights,
pub commit_group_id: u64,
pub committed_index: u64,
}
impl Progress {
pub fn new(next_idx: u64, ins_size: usize) -> Self {
Progress {
matched: 0,
next_idx,
state: ProgressState::default(),
paused: false,
pending_snapshot: 0,
pending_request_snapshot: 0,
recent_active: false,
ins: Inflights::new(ins_size),
commit_group_id: 0,
committed_index: 0,
}
}
fn reset_state(&mut self, state: ProgressState) {
self.paused = false;
self.pending_snapshot = 0;
self.state = state;
self.ins.reset();
}
pub(crate) fn reset(&mut self, next_idx: u64) {
self.matched = 0;
self.next_idx = next_idx;
self.state = ProgressState::default();
self.paused = false;
self.pending_snapshot = 0;
self.pending_request_snapshot = INVALID_INDEX;
self.recent_active = false;
debug_assert!(self.ins.cap() != 0);
self.ins.reset();
}
pub fn become_probe(&mut self) {
if self.state == ProgressState::Snapshot {
let pending_snapshot = self.pending_snapshot;
self.reset_state(ProgressState::Probe);
self.next_idx = cmp::max(self.matched + 1, pending_snapshot + 1);
} else {
self.reset_state(ProgressState::Probe);
self.next_idx = self.matched + 1;
}
}
#[inline]
pub fn become_replicate(&mut self) {
self.reset_state(ProgressState::Replicate);
self.next_idx = self.matched + 1;
}
#[inline]
pub fn become_snapshot(&mut self, snapshot_idx: u64) {
self.reset_state(ProgressState::Snapshot);
self.pending_snapshot = snapshot_idx;
}
#[inline]
pub fn snapshot_failure(&mut self) {
self.pending_snapshot = 0;
}
#[inline]
pub fn maybe_snapshot_abort(&self) -> bool {
self.state == ProgressState::Snapshot && self.matched >= self.pending_snapshot
}
pub fn maybe_update(&mut self, n: u64) -> bool {
let need_update = self.matched < n;
if need_update {
self.matched = n;
self.resume();
};
if self.next_idx < n + 1 {
self.next_idx = n + 1
}
need_update
}
pub fn update_committed(&mut self, committed_index: u64) {
if committed_index > self.committed_index {
self.committed_index = committed_index
}
}
#[inline]
pub fn optimistic_update(&mut self, n: u64) {
self.next_idx = n + 1;
}
pub fn maybe_decr_to(&mut self, rejected: u64, match_hint: u64, request_snapshot: u64) -> bool {
if self.state == ProgressState::Replicate {
if rejected < self.matched
|| (rejected == self.matched && request_snapshot == INVALID_INDEX)
{
return false;
}
if request_snapshot == INVALID_INDEX {
self.next_idx = self.matched + 1;
} else {
self.pending_request_snapshot = request_snapshot;
}
return true;
}
if (self.next_idx == 0 || self.next_idx - 1 != rejected)
&& request_snapshot == INVALID_INDEX
{
return false;
}
if request_snapshot == INVALID_INDEX {
self.next_idx = cmp::min(rejected, match_hint + 1);
if self.next_idx < 1 {
self.next_idx = 1;
}
} else if self.pending_request_snapshot == INVALID_INDEX {
self.pending_request_snapshot = request_snapshot;
}
self.resume();
true
}
#[inline]
pub fn is_paused(&self) -> bool {
match self.state {
ProgressState::Probe => self.paused,
ProgressState::Replicate => self.ins.full(),
ProgressState::Snapshot => true,
}
}
#[inline]
pub fn resume(&mut self) {
self.paused = false;
}
#[inline]
pub fn pause(&mut self) {
self.paused = true;
}
pub fn update_state(&mut self, last: u64) {
match self.state {
ProgressState::Replicate => {
self.optimistic_update(last);
self.ins.add(last);
}
ProgressState::Probe => self.pause(),
ProgressState::Snapshot => panic!(
"updating progress state in unhandled state {:?}",
self.state
),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn new_progress(
state: ProgressState,
matched: u64,
next_idx: u64,
pending_snapshot: u64,
ins_size: usize,
) -> Progress {
let mut p = Progress::new(next_idx, ins_size);
p.state = state;
p.matched = matched;
p.pending_snapshot = pending_snapshot;
p
}
#[test]
fn test_progress_is_paused() {
let tests = vec![
(ProgressState::Probe, false, false),
(ProgressState::Probe, true, true),
(ProgressState::Replicate, false, false),
(ProgressState::Replicate, true, false),
(ProgressState::Snapshot, false, true),
(ProgressState::Snapshot, true, true),
];
for (i, &(state, paused, w)) in tests.iter().enumerate() {
let mut p = new_progress(state, 0, 0, 0, 256);
p.paused = paused;
if p.is_paused() != w {
panic!("#{}: shouldwait = {}, want {}", i, p.is_paused(), w)
}
}
}
#[test]
fn test_progress_resume() {
let mut p = Progress::new(2, 256);
p.paused = true;
p.maybe_decr_to(1, 1, INVALID_INDEX);
assert!(!p.paused, "paused= true, want false");
p.paused = true;
p.maybe_update(2);
assert!(!p.paused, "paused= true, want false");
}
#[test]
fn test_progress_become_probe() {
let matched = 1u64;
let mut tests = vec![
(
new_progress(ProgressState::Replicate, matched, 5, 0, 256),
2,
),
(
new_progress(ProgressState::Snapshot, matched, 5, 10, 256),
11,
),
(new_progress(ProgressState::Snapshot, matched, 5, 0, 256), 2),
];
for (i, &mut (ref mut p, wnext)) in tests.iter_mut().enumerate() {
p.become_probe();
if p.state != ProgressState::Probe {
panic!(
"#{}: state = {:?}, want {:?}",
i,
p.state,
ProgressState::Probe
);
}
if p.matched != matched {
panic!("#{}: match = {:?}, want {:?}", i, p.matched, matched);
}
if p.next_idx != wnext {
panic!("#{}: next = {}, want {}", i, p.next_idx, wnext);
}
}
}
#[test]
fn test_progress_become_replicate() {
let mut p = new_progress(ProgressState::Probe, 1, 5, 0, 256);
p.become_replicate();
assert_eq!(p.state, ProgressState::Replicate);
assert_eq!(p.matched, 1);
assert_eq!(p.matched + 1, p.next_idx);
}
#[test]
fn test_progress_become_snapshot() {
let mut p = new_progress(ProgressState::Probe, 1, 5, 0, 256);
p.become_snapshot(10);
assert_eq!(p.state, ProgressState::Snapshot);
assert_eq!(p.matched, 1);
assert_eq!(p.pending_snapshot, 10);
}
#[test]
fn test_progress_update() {
let (prev_m, prev_n) = (3u64, 5u64);
let tests = vec![
(prev_m - 1, prev_m, prev_n, false),
(prev_m, prev_m, prev_n, false),
(prev_m + 1, prev_m + 1, prev_n, true),
(prev_m + 2, prev_m + 2, prev_n + 1, true),
];
for (i, &(update, wm, wn, wok)) in tests.iter().enumerate() {
let mut p = Progress::new(prev_n, 256);
p.matched = prev_m;
let ok = p.maybe_update(update);
if ok != wok {
panic!("#{}: ok= {}, want {}", i, ok, wok);
}
if p.matched != wm {
panic!("#{}: match= {}, want {}", i, p.matched, wm);
}
if p.next_idx != wn {
panic!("#{}: next= {}, want {}", i, p.next_idx, wn);
}
}
}
#[test]
fn test_progress_maybe_decr() {
let tests = vec![
(ProgressState::Replicate, 5, 10, 5, 5, false, 10),
(ProgressState::Replicate, 5, 10, 4, 4, false, 10),
(ProgressState::Replicate, 5, 10, 9, 9, true, 6),
(ProgressState::Probe, 0, 0, 0, 0, false, 0),
(ProgressState::Probe, 0, 10, 5, 5, false, 10),
(ProgressState::Probe, 0, 10, 9, 9, true, 9),
(ProgressState::Probe, 0, 2, 1, 1, true, 1),
(ProgressState::Probe, 0, 1, 0, 0, true, 1),
(ProgressState::Probe, 0, 10, 9, 2, true, 3),
(ProgressState::Probe, 0, 10, 9, 0, true, 1),
];
for (i, &(state, m, n, rejected, last, w, wn)) in tests.iter().enumerate() {
let mut p = new_progress(state, m, n, 0, 0);
if p.maybe_decr_to(rejected, last, 0) != w {
panic!("#{}: maybeDecrTo= {}, want {}", i, !w, w);
}
if p.matched != m {
panic!("#{}: match= {}, want {}", i, p.matched, m);
}
if p.next_idx != wn {
panic!("#{}: next= {}, want {}", i, p.next_idx, wn);
}
}
}
}