1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use super::{IOOp, IOType};
use std::future::{self, Future};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use crossbeam_utils::CachePadded;
use parking_lot::Mutex;
use strum::EnumCount;
#[derive(Debug)]
pub struct IORateLimiterStatistics {
read: [CachePadded<AtomicUsize>; IOType::COUNT],
write: [CachePadded<AtomicUsize>; IOType::COUNT],
}
impl IORateLimiterStatistics {
pub fn new() -> Self {
IORateLimiterStatistics {
read: Default::default(),
write: Default::default(),
}
}
pub fn add(&self, io_type: IOType, io_op: IOOp, len: usize) {
match io_op {
IOOp::Read => {
self.read[io_type as usize].fetch_add(len, Ordering::Relaxed);
}
IOOp::Write => {
self.write[io_type as usize].fetch_add(len, Ordering::Relaxed);
}
}
}
pub fn fetch(&self, io_type: IOType, io_op: IOOp) -> usize {
match io_op {
IOOp::Read => self.read[io_type as usize].load(Ordering::Relaxed),
IOOp::Write => self.write[io_type as usize].load(Ordering::Relaxed),
}
}
#[allow(dead_code)]
pub fn reset(&self) {
for i in self.read.iter() {
i.store(0, Ordering::Relaxed);
}
for i in self.write.iter() {
i.store(0, Ordering::Relaxed);
}
}
}
#[derive(Debug)]
pub struct IORateLimiter {
refill_bytes: usize,
stats: Option<Arc<IORateLimiterStatistics>>,
}
impl IORateLimiter {
pub fn new(refill_bytes: usize, enable_statistics: bool) -> IORateLimiter {
IORateLimiter {
refill_bytes,
stats: if enable_statistics {
Some(Arc::new(IORateLimiterStatistics::new()))
} else {
None
},
}
}
pub fn statistics(&self) -> Option<Arc<IORateLimiterStatistics>> {
self.stats.clone()
}
pub fn request(&self, io_type: IOType, io_op: IOOp, bytes: usize) -> usize {
let bytes = if self.refill_bytes > 0 {
std::cmp::min(self.refill_bytes, bytes)
} else {
bytes
};
if let Some(stats) = &self.stats {
stats.add(io_type, io_op, bytes);
}
bytes
}
pub fn async_request(
&self,
io_type: IOType,
io_op: IOOp,
bytes: usize,
) -> impl Future<Output = usize> {
future::ready(self.request(io_type, io_op, bytes))
}
}
lazy_static! {
static ref IO_RATE_LIMITER: Mutex<Option<Arc<IORateLimiter>>> = Mutex::new(None);
}
pub fn set_io_rate_limiter(limiter: Option<Arc<IORateLimiter>>) {
*IO_RATE_LIMITER.lock() = limiter;
}
pub fn get_io_rate_limiter() -> Option<Arc<IORateLimiter>> {
(*IO_RATE_LIMITER.lock()).clone()
}