1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use std::collections::HashMap;
use crate::{RocksEngine, UserProperties};
use engine_traits::util::get_expire_ts;
use engine_traits::{
DecodeProperties, Range, Result, TableProperties, TablePropertiesCollection,
TablePropertiesExt, TtlProperties, TtlPropertiesExt,
};
use rocksdb::{DBEntryType, TablePropertiesCollector, TablePropertiesCollectorFactory};
use tikv_util::error;
const PROP_MAX_EXPIRE_TS: &str = "tikv.max_expire_ts";
const PROP_MIN_EXPIRE_TS: &str = "tikv.min_expire_ts";
pub struct RocksTtlProperties;
impl RocksTtlProperties {
pub fn encode(ttl_props: &TtlProperties) -> UserProperties {
let mut props = UserProperties::new();
props.encode_u64(PROP_MAX_EXPIRE_TS, ttl_props.max_expire_ts);
props.encode_u64(PROP_MIN_EXPIRE_TS, ttl_props.min_expire_ts);
props
}
pub fn decode<T: DecodeProperties>(props: &T) -> Result<TtlProperties> {
let res = TtlProperties {
max_expire_ts: props.decode_u64(PROP_MAX_EXPIRE_TS)?,
min_expire_ts: props.decode_u64(PROP_MIN_EXPIRE_TS)?,
};
Ok(res)
}
}
impl TtlPropertiesExt for RocksEngine {
fn get_range_ttl_properties_cf(
&self,
cf: &str,
start_key: &[u8],
end_key: &[u8],
) -> Result<Vec<(String, TtlProperties)>> {
let range = Range::new(start_key, end_key);
let collection = self.get_properties_of_tables_in_range(cf, &[range])?;
if collection.is_empty() {
return Ok(vec![]);
}
let mut res = Vec::new();
for (file_name, v) in collection.iter() {
let prop = match RocksTtlProperties::decode(&v.user_collected_properties()) {
Ok(v) => v,
Err(_) => continue,
};
res.push((file_name.to_string(), prop));
}
Ok(res)
}
}
#[derive(Default)]
pub struct TtlPropertiesCollector {
prop: TtlProperties,
}
impl TablePropertiesCollector for TtlPropertiesCollector {
fn add(&mut self, key: &[u8], value: &[u8], entry_type: DBEntryType, _: u64, _: u64) {
if entry_type != DBEntryType::Put {
return;
}
if !key.starts_with(keys::DATA_PREFIX_KEY) {
return;
}
let expire_ts = match get_expire_ts(&value) {
Ok(ts) => ts,
Err(e) => {
error!("failed to get expire ts";
"key" => log_wrappers::Value::key(key),
"value" => log_wrappers::Value::value(value),
"err" => %e,
);
0
}
};
if expire_ts == 0 {
return;
}
self.prop.max_expire_ts = std::cmp::max(self.prop.max_expire_ts, expire_ts);
if self.prop.min_expire_ts == 0 {
self.prop.min_expire_ts = expire_ts;
} else {
self.prop.min_expire_ts = std::cmp::min(self.prop.min_expire_ts, expire_ts);
}
}
fn finish(&mut self) -> HashMap<Vec<u8>, Vec<u8>> {
if self.prop.max_expire_ts == 0 && self.prop.min_expire_ts == 0 {
return HashMap::default();
}
RocksTtlProperties::encode(&self.prop).0
}
}
pub struct TtlPropertiesCollectorFactory {}
impl TablePropertiesCollectorFactory for TtlPropertiesCollectorFactory {
fn create_table_properties_collector(&mut self, _: u32) -> Box<dyn TablePropertiesCollector> {
Box::new(TtlPropertiesCollector::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
use engine_traits::util::append_expire_ts;
use tikv_util::time::UnixSecs;
#[test]
fn test_ttl_properties() {
let get_properties = |case: &[(&'static str, u64)]| -> Result<TtlProperties> {
let mut collector = TtlPropertiesCollector::default();
for &(k, ts) in case {
let mut v = vec![0; 10];
append_expire_ts(&mut v, ts);
collector.add(k.as_bytes(), &v, DBEntryType::Put, 0, 0);
}
for &(k, _) in case {
let v = vec![0; 10];
collector.add(k.as_bytes(), &v, DBEntryType::Other, 0, 0);
}
let result = UserProperties(collector.finish());
RocksTtlProperties::decode(&result)
};
let case1 = [
("za", 0),
("zb", UnixSecs::now().into_inner()),
("zc", 1),
("zd", u64::MAX),
("ze", 0),
];
let props = get_properties(&case1).unwrap();
assert_eq!(props.max_expire_ts, u64::MAX);
assert_eq!(props.min_expire_ts, 1);
let case2 = [("za", 0)];
assert!(get_properties(&case2).is_err());
let case3 = [];
assert!(get_properties(&case3).is_err());
let case4 = [("za", 1)];
let props = get_properties(&case4).unwrap();
assert_eq!(props.max_expire_ts, 1);
assert_eq!(props.min_expire_ts, 1);
}
}