1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use crate::raw::Env;
use engine_traits::{EngineFileSystemInspector, FileSystemInspector};
use rocksdb::FileSystemInspector as DBFileSystemInspector;
use std::sync::Arc;
pub fn get_env(
base_env: Option<Arc<Env>>,
limiter: Option<Arc<file_system::IORateLimiter>>,
) -> Result<Arc<Env>, String> {
let base_env = base_env.unwrap_or_else(|| Arc::new(Env::default()));
Ok(Arc::new(Env::new_file_system_inspected_env(
base_env,
WrappedFileSystemInspector {
inspector: EngineFileSystemInspector::from_limiter(limiter),
},
)?))
}
pub struct WrappedFileSystemInspector<T: FileSystemInspector> {
inspector: T,
}
impl<T: FileSystemInspector> DBFileSystemInspector for WrappedFileSystemInspector<T> {
fn read(&self, len: usize) -> Result<usize, String> {
self.inspector.read(len)
}
fn write(&self, len: usize) -> Result<usize, String> {
self.inspector.write(len)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compat::Compat;
use crate::event_listener::RocksEventListener;
use crate::raw::{ColumnFamilyOptions, DBCompressionType};
use crate::raw_util::{new_engine_opt, CFOptions};
use engine_traits::{CompactExt, CF_DEFAULT};
use file_system::{IOOp, IORateLimiter, IORateLimiterStatistics, IOType};
use keys::data_key;
use rocksdb::Writable;
use rocksdb::{DBOptions, DB};
use std::sync::Arc;
use tempfile::Builder;
fn new_test_db(dir: &str) -> (Arc<DB>, Arc<IORateLimiterStatistics>) {
let limiter = Arc::new(IORateLimiter::new(0, true ));
let mut db_opts = DBOptions::new();
db_opts.add_event_listener(RocksEventListener::new("test_db"));
let env = get_env(None, Some(limiter.clone())).unwrap();
db_opts.set_env(env);
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.set_disable_auto_compactions(true);
cf_opts.compression_per_level(&[DBCompressionType::No; 7]);
let db = Arc::new(
new_engine_opt(dir, db_opts, vec![CFOptions::new(CF_DEFAULT, cf_opts)]).unwrap(),
);
(db, limiter.statistics().unwrap())
}
#[test]
fn test_inspected_compact() {
let value_size = 1024;
let temp_dir = Builder::new()
.prefix("test_inspected_compact")
.tempdir()
.unwrap();
let (db, stats) = new_test_db(temp_dir.path().to_str().unwrap());
let value = vec![b'v'; value_size];
db.put(&data_key(b"a1"), &value).unwrap();
db.put(&data_key(b"a2"), &value).unwrap();
db.flush(true ).unwrap();
assert!(stats.fetch(IOType::Flush, IOOp::Write) > value_size * 2);
assert!(stats.fetch(IOType::Flush, IOOp::Write) < value_size * 3);
stats.reset();
db.put(&data_key(b"a2"), &value).unwrap();
db.put(&data_key(b"a3"), &value).unwrap();
db.flush(true ).unwrap();
assert!(stats.fetch(IOType::Flush, IOOp::Write) > value_size * 2);
assert!(stats.fetch(IOType::Flush, IOOp::Write) < value_size * 3);
stats.reset();
db.c()
.compact_range(
CF_DEFAULT, None,
None,
false,
1,
)
.unwrap();
assert!(stats.fetch(IOType::LevelZeroCompaction, IOOp::Read) > value_size * 4);
assert!(stats.fetch(IOType::LevelZeroCompaction, IOOp::Read) < value_size * 5);
assert!(stats.fetch(IOType::LevelZeroCompaction, IOOp::Write) > value_size * 3);
assert!(stats.fetch(IOType::LevelZeroCompaction, IOOp::Write) < value_size * 4);
}
}