1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;
use libc::{getpid, pid_t};
use crate::server::load_statistics::ThreadLoad;
use tikv_util::metrics::{cpu_total, get_thread_ids};
use procinfo::pid;
pub struct ThreadLoadStatistics {
pid: pid_t,
tids: Vec<pid_t>,
slots: usize,
cur_pos: usize,
cpu_usages: Vec<f64>,
instants: Vec<Instant>,
thread_load: Arc<ThreadLoad>,
}
impl ThreadLoadStatistics {
pub fn new(slots: usize, prefix: &str, thread_load: Arc<ThreadLoad>) -> Self {
let pid: pid_t = unsafe { getpid() };
let mut tids = vec![];
let mut cpu_total_count = 0f64;
for tid in get_thread_ids(pid).unwrap() {
if let Ok(stat) = pid::stat_task(pid, tid) {
if !stat.command.starts_with(prefix) {
continue;
}
cpu_total_count += cpu_total(&stat);
tids.push(tid);
}
}
ThreadLoadStatistics {
pid,
tids,
slots,
cur_pos: 0,
cpu_usages: vec![cpu_total_count; slots],
instants: vec![Instant::now(); slots],
thread_load,
}
}
pub fn record(&mut self, instant: Instant) {
self.instants[self.cur_pos] = instant;
self.cpu_usages[self.cur_pos] = 0f64;
for tid in &self.tids {
if let Ok(stat) = pid::stat_task(self.pid, *tid) {
self.cpu_usages[self.cur_pos] += cpu_total(&stat);
}
}
let current_instant = self.instants[self.cur_pos];
let current_cpu_usage = self.cpu_usages[self.cur_pos];
let next_pos = (self.cur_pos + 1) % self.slots;
let earlist_instant = self.instants[next_pos];
let earlist_cpu_usage = self.cpu_usages[next_pos];
self.cur_pos = next_pos;
let millis = (current_instant - earlist_instant).as_millis() as usize;
if millis > 0 {
let mut cpu_usage = calc_cpu_load(millis, earlist_cpu_usage, current_cpu_usage);
if cpu_usage > self.tids.len() * 100 {
cpu_usage = self.tids.len() * 100;
}
self.thread_load.load.store(cpu_usage, Ordering::Relaxed);
self.thread_load.term.fetch_add(1, Ordering::Relaxed);
}
}
}
#[inline]
fn calc_cpu_load(elapsed_millis: usize, start_usage: f64, end_usage: f64) -> usize {
let cpu_usage = (end_usage - start_usage) * 1000f64 * 100f64;
cpu_usage as usize / elapsed_millis
}
#[cfg(test)]
mod tests {
use std::thread;
use super::*;
#[test]
#[ignore]
fn test_thread_load_statistic() {
let t = thread::current();
let thread_name = t.name().unwrap();
let end = ::std::cmp::min(thread_name.len(), 15);
let thread_name = thread_name[..end].to_owned();
let load = Arc::new(ThreadLoad::with_threshold(80));
let mut stats = ThreadLoadStatistics::new(2, &thread_name, Arc::clone(&load));
let start = Instant::now();
loop {
if (Instant::now() - start).as_millis() > 200 {
break;
}
}
stats.record(Instant::now());
let cpu_usage = load.load();
assert!(cpu_usage < 100);
if cpu_usage < 80 {
panic!("the load must be heavy than 80, but got {}", cpu_usage);
}
}
}