1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::sync::Arc;
use std::time::Duration;
use strum::EnumCount;
use crate::iosnoop::{fetch_io_bytes, flush_io_latency_metrics};
use crate::metrics::IO_BYTES_VEC;
use crate::IOBytes;
use crate::IORateLimiterStatistics;
use crate::{IOOp, IOType};
pub enum BytesFetcher {
FromRateLimiter(Arc<IORateLimiterStatistics>),
FromIOSnooper(),
}
impl BytesFetcher {
fn fetch(&self, io_type: IOType) -> IOBytes {
match *self {
BytesFetcher::FromRateLimiter(ref stats) => IOBytes {
read: stats.fetch(io_type, IOOp::Read) as u64,
write: stats.fetch(io_type, IOOp::Write) as u64,
},
BytesFetcher::FromIOSnooper() => fetch_io_bytes(io_type),
}
}
}
macro_rules! flush_io_bytes {
($fetcher:expr, $metrics:ident, $io_type:expr, $last_fetch:expr) => {
let bytes = $fetcher.fetch($io_type);
let delta_bytes = bytes - $last_fetch;
$last_fetch = bytes;
IO_BYTES_VEC.$metrics.read.inc_by(delta_bytes.read as i64);
IO_BYTES_VEC.$metrics.write.inc_by(delta_bytes.write as i64);
};
}
pub struct MetricsManager {
fetcher: BytesFetcher,
last_fetch: [IOBytes; IOType::COUNT],
}
impl MetricsManager {
pub fn new(fetcher: BytesFetcher) -> Self {
MetricsManager {
fetcher,
last_fetch: Default::default(),
}
}
pub fn flush(&mut self, _duration: Duration) {
flush_io_latency_metrics();
flush_io_bytes!(
self.fetcher,
other,
IOType::Other,
self.last_fetch[IOType::Other as usize]
);
flush_io_bytes!(
self.fetcher,
foreground_read,
IOType::ForegroundRead,
self.last_fetch[IOType::ForegroundRead as usize]
);
flush_io_bytes!(
self.fetcher,
foreground_write,
IOType::ForegroundWrite,
self.last_fetch[IOType::ForegroundWrite as usize]
);
flush_io_bytes!(
self.fetcher,
flush,
IOType::Flush,
self.last_fetch[IOType::Flush as usize]
);
flush_io_bytes!(
self.fetcher,
compaction,
IOType::Compaction,
self.last_fetch[IOType::Compaction as usize]
);
flush_io_bytes!(
self.fetcher,
level_zero_compaction,
IOType::LevelZeroCompaction,
self.last_fetch[IOType::LevelZeroCompaction as usize]
);
flush_io_bytes!(
self.fetcher,
replication,
IOType::Replication,
self.last_fetch[IOType::Replication as usize]
);
flush_io_bytes!(
self.fetcher,
load_balance,
IOType::LoadBalance,
self.last_fetch[IOType::LoadBalance as usize]
);
flush_io_bytes!(
self.fetcher,
gc,
IOType::Gc,
self.last_fetch[IOType::Gc as usize]
);
flush_io_bytes!(
self.fetcher,
import,
IOType::Import,
self.last_fetch[IOType::Import as usize]
);
flush_io_bytes!(
self.fetcher,
export,
IOType::Export,
self.last_fetch[IOType::Export as usize]
);
}
}