1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#![feature(shrink_to)]
#![feature(cell_update)]
use std::sync::atomic::{AtomicUsize, Ordering};
macro_rules! box_err {
($e:expr) => ({
use std::error::Error;
let e: Box<dyn Error + Sync + Send> = format!("[{}:{}]: {}", file!(), line!(), $e).into();
e.into()
});
($f:tt, $($arg:expr),+) => ({
box_err!(format!($f, $($arg),+))
});
}
pub mod codec;
mod cache_evict;
mod config;
mod engine;
mod errors;
mod log_batch;
mod memtable;
mod pipe_log;
mod purge;
mod util;
use crate::pipe_log::PipeLog;
pub use self::config::{Config, RecoveryMode};
pub use self::errors::{Error, Result};
pub use self::log_batch::{EntryExt, LogBatch};
pub use self::util::ReadableSize;
pub type RaftLogEngine<X, Y> = self::engine::Engine<X, Y, PipeLog>;
#[derive(Clone, Copy, Default)]
pub struct CacheStats {
pub hit: usize,
pub miss: usize,
pub cache_size: usize,
}
#[derive(Default)]
pub struct GlobalStats {
cache_hit: AtomicUsize,
cache_miss: AtomicUsize,
cache_size: AtomicUsize,
rewrite_operations: AtomicUsize,
compacted_rewrite_operations: AtomicUsize,
}
impl GlobalStats {
pub fn sub_mem_change(&self, bytes: usize) {
self.cache_size.fetch_sub(bytes, Ordering::Release);
}
pub fn add_mem_change(&self, bytes: usize) {
self.cache_size.fetch_add(bytes, Ordering::Release);
}
pub fn add_cache_hit(&self, count: usize) {
self.cache_hit.fetch_add(count, Ordering::Relaxed);
}
pub fn add_cache_miss(&self, count: usize) {
self.cache_miss.fetch_add(count, Ordering::Relaxed);
}
pub fn cache_hit(&self) -> usize {
self.cache_hit.load(Ordering::Relaxed)
}
pub fn cache_miss(&self) -> usize {
self.cache_miss.load(Ordering::Relaxed)
}
pub fn cache_size(&self) -> usize {
self.cache_size.load(Ordering::Acquire)
}
pub fn flush_cache_stats(&self) -> CacheStats {
CacheStats {
hit: self.cache_hit.swap(0, Ordering::SeqCst),
miss: self.cache_miss.swap(0, Ordering::SeqCst),
cache_size: self.cache_size.load(Ordering::SeqCst),
}
}
pub fn add_rewrite(&self, count: usize) {
self.rewrite_operations.fetch_add(count, Ordering::Release);
}
pub fn add_compacted_rewrite(&self, count: usize) {
self.compacted_rewrite_operations
.fetch_add(count, Ordering::Release);
}
pub fn rewrite_operations(&self) -> usize {
self.rewrite_operations.load(Ordering::Acquire)
}
pub fn compacted_rewrite_operations(&self) -> usize {
self.compacted_rewrite_operations.load(Ordering::Acquire)
}
#[cfg(test)]
pub fn reset_cache(&self) {
self.cache_hit.store(0, Ordering::Relaxed);
self.cache_miss.store(0, Ordering::Relaxed);
self.cache_size.store(0, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use crate::log_batch::EntryExt;
use raft::eraftpb::Entry;
#[ctor::ctor]
fn init() {
env_logger::init();
}
impl EntryExt<Entry> for Entry {
fn index(e: &Entry) -> u64 {
e.get_index()
}
}
}