1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use tidb_query_common::storage::{
IntervalRange, OwnedKvPair, PointRange, Result as QEResult, Storage,
};
use crate::coprocessor::Error;
use crate::storage::mvcc::NewerTsCheckState;
use crate::storage::Statistics;
use crate::storage::{Scanner, Store};
use txn_types::Key;
pub struct TiKVStorage<S: Store> {
store: S,
scanner: Option<S::Scanner>,
cf_stats_backlog: Statistics,
met_newer_ts_data_backlog: NewerTsCheckState,
}
impl<S: Store> TiKVStorage<S> {
pub fn new(store: S, check_can_be_cached: bool) -> Self {
Self {
store,
scanner: None,
cf_stats_backlog: Statistics::default(),
met_newer_ts_data_backlog: if check_can_be_cached {
NewerTsCheckState::NotMetYet
} else {
NewerTsCheckState::Unknown
},
}
}
}
impl<S: Store> Storage for TiKVStorage<S> {
type Statistics = Statistics;
fn begin_scan(
&mut self,
is_backward_scan: bool,
is_key_only: bool,
range: IntervalRange,
) -> QEResult<()> {
if let Some(scanner) = &mut self.scanner {
self.cf_stats_backlog.add(&scanner.take_statistics());
if scanner.met_newer_ts_data() == NewerTsCheckState::Met {
self.met_newer_ts_data_backlog = NewerTsCheckState::Met;
}
}
let lower = Some(Key::from_raw(&range.lower_inclusive));
let upper = Some(Key::from_raw(&range.upper_exclusive));
self.scanner = Some(
self.store
.scanner(
is_backward_scan,
is_key_only,
self.met_newer_ts_data_backlog == NewerTsCheckState::NotMetYet,
lower,
upper,
)
.map_err(Error::from)?,
);
Ok(())
}
fn scan_next(&mut self) -> QEResult<Option<OwnedKvPair>> {
let kv = self.scanner.as_mut().unwrap().next().map_err(Error::from)?;
Ok(kv.map(|(k, v)| (k.into_raw().unwrap(), v)))
}
fn get(&mut self, _is_key_only: bool, range: PointRange) -> QEResult<Option<OwnedKvPair>> {
let key = range.0;
let value = self
.store
.incremental_get(&Key::from_raw(&key))
.map_err(Error::from)?;
Ok(value.map(move |v| (key, v)))
}
#[inline]
fn met_uncacheable_data(&self) -> Option<bool> {
if let Some(scanner) = &self.scanner {
if scanner.met_newer_ts_data() == NewerTsCheckState::Met {
return Some(true);
}
}
if self.store.incremental_get_met_newer_ts_data() == NewerTsCheckState::Met {
return Some(true);
}
match self.met_newer_ts_data_backlog {
NewerTsCheckState::Unknown => None,
NewerTsCheckState::Met => Some(true),
NewerTsCheckState::NotMetYet => Some(false),
}
}
fn collect_statistics(&mut self, dest: &mut Statistics) {
self.cf_stats_backlog
.add(&self.store.incremental_get_take_statistics());
if let Some(scanner) = &mut self.scanner {
self.cf_stats_backlog.add(&scanner.take_statistics());
}
dest.add(&self.cf_stats_backlog);
self.cf_stats_backlog = Statistics::default();
}
}