1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::fmt;
use std::sync::Arc;
use kvproto::import_sstpb::SstMeta;
use crate::store::util::is_epoch_stale;
use crate::store::{StoreMsg, StoreRouter};
use engine_traits::KvEngine;
use pd_client::PdClient;
use sst_importer::SSTImporter;
use std::marker::PhantomData;
use tikv_util::error;
use tikv_util::worker::Runnable;
pub enum Task {
DeleteSST { ssts: Vec<SstMeta> },
ValidateSST { ssts: Vec<SstMeta> },
}
impl fmt::Display for Task {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Task::DeleteSST { ref ssts } => write!(f, "Delete {} ssts", ssts.len()),
Task::ValidateSST { ref ssts } => write!(f, "Validate {} ssts", ssts.len()),
}
}
}
pub struct Runner<EK, C, S>
where
EK: KvEngine,
S: StoreRouter<EK>,
{
store_id: u64,
store_router: S,
importer: Arc<SSTImporter>,
pd_client: Arc<C>,
_engine: PhantomData<EK>,
}
impl<EK, C, S> Runner<EK, C, S>
where
EK: KvEngine,
C: PdClient,
S: StoreRouter<EK>,
{
pub fn new(
store_id: u64,
store_router: S,
importer: Arc<SSTImporter>,
pd_client: Arc<C>,
) -> Runner<EK, C, S> {
Runner {
store_id,
store_router,
importer,
pd_client,
_engine: PhantomData,
}
}
fn handle_delete_sst(&self, ssts: Vec<SstMeta>) {
for sst in &ssts {
let _ = self.importer.delete(sst);
}
}
fn handle_validate_sst(&self, ssts: Vec<SstMeta>) {
let store_id = self.store_id;
let mut invalid_ssts = Vec::new();
for sst in ssts {
match self.pd_client.get_region(sst.get_range().get_start()) {
Ok(r) => {
if is_epoch_stale(r.get_region_epoch(), sst.get_region_epoch()) {
continue;
}
if r.get_id() == sst.get_region_id()
&& r.get_peers().iter().any(|p| p.get_store_id() == store_id)
{
continue;
}
invalid_ssts.push(sst);
}
Err(e) => {
error!(%e; "get region failed");
}
}
}
let msg = StoreMsg::ValidateSSTResult { invalid_ssts };
if let Err(e) = self.store_router.send(msg) {
error!(%e; "send validate sst result failed");
}
}
}
impl<EK, C, S> Runnable for Runner<EK, C, S>
where
EK: KvEngine,
C: PdClient,
S: StoreRouter<EK>,
{
type Task = Task;
fn run(&mut self, task: Task) {
match task {
Task::DeleteSST { ssts } => {
self.handle_delete_sst(ssts);
}
Task::ValidateSST { ssts } => {
self.handle_validate_sst(ssts);
}
}
}
}