use std::cmp;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::eraftpb::*;
use crate::errors::{Error, Result, StorageError};
use crate::util::limit_size;
use getset::{Getters, Setters};
#[derive(Debug, Clone, Default, Getters, Setters)]
pub struct RaftState {
pub hard_state: HardState,
pub conf_state: ConfState,
}
impl RaftState {
pub fn new(hard_state: HardState, conf_state: ConfState) -> RaftState {
RaftState {
hard_state,
conf_state,
}
}
pub fn initialized(&self) -> bool {
self.conf_state != ConfState::default()
}
}
pub trait Storage {
fn initial_state(&self) -> Result<RaftState>;
fn entries(&self, low: u64, high: u64, max_size: impl Into<Option<u64>>) -> Result<Vec<Entry>>;
fn term(&self, idx: u64) -> Result<u64>;
fn first_index(&self) -> Result<u64>;
fn last_index(&self) -> Result<u64>;
fn snapshot(&self, request_index: u64) -> Result<Snapshot>;
}
pub struct MemStorageCore {
raft_state: RaftState,
entries: Vec<Entry>,
snapshot_metadata: SnapshotMetadata,
trigger_snap_unavailable: bool,
}
impl Default for MemStorageCore {
fn default() -> MemStorageCore {
MemStorageCore {
raft_state: Default::default(),
entries: vec![],
snapshot_metadata: Default::default(),
trigger_snap_unavailable: false,
}
}
}
impl MemStorageCore {
pub fn set_hardstate(&mut self, hs: HardState) {
self.raft_state.hard_state = hs;
}
pub fn hard_state(&self) -> &HardState {
&self.raft_state.hard_state
}
pub fn mut_hard_state(&mut self) -> &mut HardState {
&mut self.raft_state.hard_state
}
pub fn commit_to(&mut self, index: u64) -> Result<()> {
assert!(
self.has_entry_at(index),
"commit_to {} but the entry does not exist",
index
);
let diff = (index - self.entries[0].index) as usize;
self.raft_state.hard_state.commit = index;
self.raft_state.hard_state.term = self.entries[diff].term;
Ok(())
}
pub fn set_conf_state(&mut self, cs: ConfState) {
self.raft_state.conf_state = cs;
}
#[inline]
fn has_entry_at(&self, index: u64) -> bool {
!self.entries.is_empty() && index >= self.first_index() && index <= self.last_index()
}
fn first_index(&self) -> u64 {
match self.entries.first() {
Some(e) => e.index,
None => self.snapshot_metadata.index + 1,
}
}
fn last_index(&self) -> u64 {
match self.entries.last() {
Some(e) => e.index,
None => self.snapshot_metadata.index,
}
}
pub fn apply_snapshot(&mut self, mut snapshot: Snapshot) -> Result<()> {
let mut meta = snapshot.take_metadata();
let index = meta.index;
if self.first_index() > index {
return Err(Error::Store(StorageError::SnapshotOutOfDate));
}
self.snapshot_metadata = meta.clone();
self.raft_state.hard_state.term = cmp::max(self.raft_state.hard_state.term, meta.term);
self.raft_state.hard_state.commit = index;
self.entries.clear();
self.raft_state.conf_state = meta.take_conf_state();
Ok(())
}
fn snapshot(&self) -> Snapshot {
let mut snapshot = Snapshot::default();
let meta = snapshot.mut_metadata();
meta.index = self.raft_state.hard_state.commit;
meta.term = match meta.index.cmp(&self.snapshot_metadata.index) {
cmp::Ordering::Equal => self.snapshot_metadata.term,
cmp::Ordering::Greater => {
let offset = self.entries[0].index;
self.entries[(meta.index - offset) as usize].term
}
cmp::Ordering::Less => {
panic!(
"commit {} < snapshot_metadata.index {}",
meta.index, self.snapshot_metadata.index
);
}
};
meta.set_conf_state(self.raft_state.conf_state.clone());
snapshot
}
pub fn compact(&mut self, compact_index: u64) -> Result<()> {
if compact_index <= self.first_index() {
return Ok(());
}
if compact_index > self.last_index() + 1 {
panic!(
"compact not received raft logs: {}, last index: {}",
compact_index,
self.last_index()
);
}
if let Some(entry) = self.entries.first() {
let offset = compact_index - entry.index;
self.entries.drain(..offset as usize);
}
Ok(())
}
pub fn append(&mut self, ents: &[Entry]) -> Result<()> {
if ents.is_empty() {
return Ok(());
}
if self.first_index() > ents[0].index {
panic!(
"overwrite compacted raft logs, compacted: {}, append: {}",
self.first_index() - 1,
ents[0].index,
);
}
if self.last_index() + 1 < ents[0].index {
panic!(
"raft logs should be continuous, last index: {}, new appended: {}",
self.last_index(),
ents[0].index,
);
}
let diff = ents[0].index - self.first_index();
self.entries.drain(diff as usize..);
self.entries.extend_from_slice(&ents);
Ok(())
}
pub fn commit_to_and_set_conf_states(&mut self, idx: u64, cs: Option<ConfState>) -> Result<()> {
self.commit_to(idx)?;
if let Some(cs) = cs {
self.raft_state.conf_state = cs;
}
Ok(())
}
pub fn trigger_snap_unavailable(&mut self) {
self.trigger_snap_unavailable = true;
}
}
#[derive(Clone, Default)]
pub struct MemStorage {
core: Arc<RwLock<MemStorageCore>>,
}
impl MemStorage {
pub fn new() -> MemStorage {
MemStorage {
..Default::default()
}
}
pub fn new_with_conf_state<T>(conf_state: T) -> MemStorage
where
ConfState: From<T>,
{
let store = MemStorage::new();
store.initialize_with_conf_state(conf_state);
store
}
pub fn initialize_with_conf_state<T>(&self, conf_state: T)
where
ConfState: From<T>,
{
assert!(!self.initial_state().unwrap().initialized());
let mut core = self.wl();
core.raft_state.conf_state = ConfState::from(conf_state);
}
pub fn rl(&self) -> RwLockReadGuard<'_, MemStorageCore> {
self.core.read().unwrap()
}
pub fn wl(&self) -> RwLockWriteGuard<'_, MemStorageCore> {
self.core.write().unwrap()
}
}
impl Storage for MemStorage {
fn initial_state(&self) -> Result<RaftState> {
Ok(self.rl().raft_state.clone())
}
fn entries(&self, low: u64, high: u64, max_size: impl Into<Option<u64>>) -> Result<Vec<Entry>> {
let max_size = max_size.into();
let core = self.rl();
if low < core.first_index() {
return Err(Error::Store(StorageError::Compacted));
}
if high > core.last_index() + 1 {
panic!(
"index out of bound (last: {}, high: {})",
core.last_index() + 1,
high
);
}
let offset = core.entries[0].index;
let lo = (low - offset) as usize;
let hi = (high - offset) as usize;
let mut ents = core.entries[lo..hi].to_vec();
limit_size(&mut ents, max_size);
Ok(ents)
}
fn term(&self, idx: u64) -> Result<u64> {
let core = self.rl();
if idx == core.snapshot_metadata.index {
return Ok(core.snapshot_metadata.term);
}
let offset = core.first_index();
if idx < offset {
return Err(Error::Store(StorageError::Compacted));
}
if idx > core.last_index() {
return Err(Error::Store(StorageError::Unavailable));
}
Ok(core.entries[(idx - offset) as usize].term)
}
fn first_index(&self) -> Result<u64> {
Ok(self.rl().first_index())
}
fn last_index(&self) -> Result<u64> {
Ok(self.rl().last_index())
}
fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
let mut core = self.wl();
if core.trigger_snap_unavailable {
core.trigger_snap_unavailable = false;
Err(Error::Store(StorageError::SnapshotTemporarilyUnavailable))
} else {
let mut snap = core.snapshot();
if snap.get_metadata().index < request_index {
snap.mut_metadata().index = request_index;
}
Ok(snap)
}
}
}
#[cfg(test)]
mod test {
use std::panic::{self, AssertUnwindSafe};
use protobuf::Message as PbMessage;
use crate::eraftpb::{ConfState, Entry, Snapshot};
use crate::errors::{Error as RaftError, StorageError};
use super::{MemStorage, Storage};
fn new_entry(index: u64, term: u64) -> Entry {
let mut e = Entry::default();
e.term = term;
e.index = index;
e
}
fn size_of<T: PbMessage>(m: &T) -> u32 {
m.compute_size() as u32
}
fn new_snapshot(index: u64, term: u64, voters: Vec<u64>) -> Snapshot {
let mut s = Snapshot::default();
s.mut_metadata().index = index;
s.mut_metadata().term = term;
s.mut_metadata().mut_conf_state().voters = voters;
s
}
#[test]
fn test_storage_term() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let mut tests = vec![
(2, Err(RaftError::Store(StorageError::Compacted))),
(3, Ok(3)),
(4, Ok(4)),
(5, Ok(5)),
(6, Err(RaftError::Store(StorageError::Unavailable))),
];
for (i, (idx, wterm)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
let t = storage.term(idx);
if t != wterm {
panic!("#{}: expect res {:?}, got {:?}", i, wterm, t);
}
}
}
#[test]
fn test_storage_entries() {
let ents = vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 6),
];
let max_u64 = u64::max_value();
let mut tests = vec![
(
2,
6,
max_u64,
Err(RaftError::Store(StorageError::Compacted)),
),
(3, 4, max_u64, Ok(vec![new_entry(3, 3)])),
(4, 5, max_u64, Ok(vec![new_entry(4, 4)])),
(4, 6, max_u64, Ok(vec![new_entry(4, 4), new_entry(5, 5)])),
(
4,
7,
max_u64,
Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
),
(4, 7, 0, Ok(vec![new_entry(4, 4)])),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2])),
Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) / 2),
Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) - 1),
Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3])),
Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
),
];
for (i, (lo, hi, maxsize, wentries)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
let e = storage.entries(lo, hi, maxsize);
if e != wentries {
panic!("#{}: expect entries {:?}, got {:?}", i, wentries, e);
}
}
}
#[test]
fn test_storage_last_index() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let storage = MemStorage::new();
storage.wl().entries = ents;
let wresult = Ok(5);
let result = storage.last_index();
if result != wresult {
panic!("want {:?}, got {:?}", wresult, result);
}
storage.wl().append(&[new_entry(6, 5)]).unwrap();
let wresult = Ok(6);
let result = storage.last_index();
if result != wresult {
panic!("want {:?}, got {:?}", wresult, result);
}
}
#[test]
fn test_storage_first_index() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let storage = MemStorage::new();
storage.wl().entries = ents;
assert_eq!(storage.first_index(), Ok(3));
storage.wl().compact(4).unwrap();
assert_eq!(storage.first_index(), Ok(4));
}
#[test]
fn test_storage_compact() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let mut tests = vec![(2, 3, 3, 3), (3, 3, 3, 3), (4, 4, 4, 2), (5, 5, 5, 1)];
for (i, (idx, windex, wterm, wlen)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
storage.wl().compact(idx).unwrap();
let index = storage.first_index().unwrap();
if index != windex {
panic!("#{}: want {}, index {}", i, windex, index);
}
let term = if let Ok(v) = storage.entries(index, index + 1, 1) {
v.first().map_or(0, |e| e.term)
} else {
0
};
if term != wterm {
panic!("#{}: want {}, term {}", i, wterm, term);
}
let last = storage.last_index().unwrap();
let len = storage.entries(index, last + 1, 100).unwrap().len();
if len != wlen {
panic!("#{}: want {}, term {}", i, wlen, len);
}
}
}
#[test]
fn test_storage_create_snapshot() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let nodes = vec![1, 2, 3];
let mut conf_state = ConfState::default();
conf_state.voters = nodes.clone();
let unavailable = Err(RaftError::Store(
StorageError::SnapshotTemporarilyUnavailable,
));
let mut tests = vec![
(4, Ok(new_snapshot(4, 4, nodes.clone())), 0),
(5, Ok(new_snapshot(5, 5, nodes.clone())), 5),
(5, Ok(new_snapshot(6, 5, nodes)), 6),
(5, unavailable, 6),
];
for (i, (idx, wresult, windex)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
storage.wl().raft_state.hard_state.commit = idx;
storage.wl().raft_state.hard_state.term = idx;
storage.wl().raft_state.conf_state = conf_state.clone();
if wresult.is_err() {
storage.wl().trigger_snap_unavailable();
}
let result = storage.snapshot(windex);
if result != wresult {
panic!("#{}: want {:?}, got {:?}", i, wresult, result);
}
}
}
#[test]
fn test_storage_append() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let mut tests = vec![
(
vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)],
Some(vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]),
),
(
vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)],
Some(vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)]),
),
(
vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 5),
],
Some(vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 5),
]),
),
(
vec![new_entry(2, 3), new_entry(3, 3), new_entry(4, 5)],
None,
),
(
vec![new_entry(4, 5)],
Some(vec![new_entry(3, 3), new_entry(4, 5)]),
),
(
vec![new_entry(6, 6)],
Some(vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 6),
]),
),
];
for (i, (entries, wentries)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
let res = panic::catch_unwind(AssertUnwindSafe(|| storage.wl().append(&entries)));
if let Some(wentries) = wentries {
assert!(res.is_ok());
let e = &storage.wl().entries;
if *e != wentries {
panic!("#{}: want {:?}, entries {:?}", i, wentries, e);
}
} else {
assert!(res.is_err());
}
}
}
#[test]
fn test_storage_apply_snapshot() {
let nodes = vec![1, 2, 3];
let storage = MemStorage::new();
let snap = new_snapshot(4, 4, nodes.clone());
assert!(storage.wl().apply_snapshot(snap).is_ok());
let snap = new_snapshot(3, 3, nodes);
assert!(storage.wl().apply_snapshot(snap).is_err());
}
}