1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use std::time::Instant;
use async_trait::async_trait;
use kvproto::coprocessor::{KeyRange, Response};
use protobuf::Message;
use tidb_query_common::storage::scanner::{RangesScanner, RangesScannerOptions};
use tidb_query_common::storage::Range;
use tidb_query_executors::runner::MAX_TIME_SLICE;
use tidb_query_expr::BATCH_MAX_SIZE;
use tipb::{ChecksumAlgorithm, ChecksumRequest, ChecksumResponse};
use yatp::task::future::reschedule;
use crate::coprocessor::dag::TiKVStorage;
use crate::coprocessor::*;
use crate::storage::{Snapshot, SnapshotStore, Statistics};
pub struct ChecksumContext<S: Snapshot> {
req: ChecksumRequest,
scanner: RangesScanner<TiKVStorage<SnapshotStore<S>>>,
}
impl<S: Snapshot> ChecksumContext<S> {
pub fn new(
req: ChecksumRequest,
ranges: Vec<KeyRange>,
start_ts: u64,
snap: S,
req_ctx: &ReqContext,
) -> Result<Self> {
let store = SnapshotStore::new(
snap,
start_ts.into(),
req_ctx.context.get_isolation_level(),
!req_ctx.context.get_not_fill_cache(),
req_ctx.bypass_locks.clone(),
false,
);
let scanner = RangesScanner::new(RangesScannerOptions {
storage: TiKVStorage::new(store, false),
ranges: ranges
.into_iter()
.map(|r| Range::from_pb_range(r, false))
.collect(),
scan_backward_in_range: false,
is_key_only: false,
is_scanned_range_aware: false,
});
Ok(Self { req, scanner })
}
}
#[async_trait]
impl<S: Snapshot> RequestHandler for ChecksumContext<S> {
async fn handle_request(&mut self) -> Result<Response> {
let algorithm = self.req.get_algorithm();
if algorithm != ChecksumAlgorithm::Crc64Xor {
return Err(box_err!("unknown checksum algorithm {:?}", algorithm));
}
let mut checksum = 0;
let mut total_kvs = 0;
let mut total_bytes = 0;
let (old_prefix, new_prefix) = if self.req.has_rule() {
let mut rule = self.req.get_rule().clone();
(rule.take_old_prefix(), rule.take_new_prefix())
} else {
(vec![], vec![])
};
let mut prefix_digest = crc64fast::Digest::new();
prefix_digest.write(&old_prefix);
let mut row_count = 0;
let mut time_slice_start = Instant::now();
while let Some((k, v)) = self.scanner.next()? {
row_count += 1;
if row_count >= BATCH_MAX_SIZE {
if time_slice_start.elapsed() > MAX_TIME_SLICE {
reschedule().await;
time_slice_start = Instant::now();
}
row_count = 0;
}
if !k.starts_with(&new_prefix) {
return Err(box_err!("Wrong prefix expect: {:?}", new_prefix));
}
checksum =
checksum_crc64_xor(checksum, prefix_digest.clone(), &k[new_prefix.len()..], &v);
total_kvs += 1;
total_bytes += k.len() + v.len() + old_prefix.len() - new_prefix.len();
}
let mut resp = ChecksumResponse::default();
resp.set_checksum(checksum);
resp.set_total_kvs(total_kvs);
resp.set_total_bytes(total_bytes as u64);
let data = box_try!(resp.write_to_bytes());
let mut resp = Response::default();
resp.set_data(data);
Ok(resp)
}
fn collect_scan_statistics(&mut self, dest: &mut Statistics) {
self.scanner.collect_storage_stats(dest)
}
}
pub fn checksum_crc64_xor(
checksum: u64,
mut digest: crc64fast::Digest,
k_suffix: &[u8],
v: &[u8],
) -> u64 {
digest.write(k_suffix);
digest.write(v);
checksum ^ digest.sum64()
}