1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use kvproto::pdpb::*;
use pd_client::REQUEST_RECONNECT_INTERVAL;
use super::*;
#[derive(Debug)]
pub struct Retry {
retry: usize,
count: AtomicUsize,
}
impl Retry {
pub fn new(retry: usize) -> Retry {
info!("[Retry] return Ok(_) every {:?} times", retry);
Retry {
retry,
count: AtomicUsize::new(0),
}
}
fn is_ok(&self) -> bool {
let count = self.count.fetch_add(1, Ordering::SeqCst);
if count != 0 && count % self.retry == 0 {
return true;
}
thread::sleep(REQUEST_RECONNECT_INTERVAL);
false
}
}
impl PdMocker for Retry {
fn get_region_by_id(&self, _: &GetRegionByIdRequest) -> Option<Result<GetRegionResponse>> {
if self.is_ok() {
info!("[Retry] get_region_by_id returns Ok(_)");
Some(Ok(GetRegionResponse::default()))
} else {
info!("[Retry] get_region_by_id returns Err(_)");
Some(Err("please retry".to_owned()))
}
}
fn get_store(&self, _: &GetStoreRequest) -> Option<Result<GetStoreResponse>> {
if self.is_ok() {
info!("[Retry] get_store returns Ok(_)");
Some(Ok(GetStoreResponse::default()))
} else {
info!("[Retry] get_store returns Err(_)");
Some(Err("please retry".to_owned()))
}
}
}
#[derive(Debug)]
pub struct NotRetry {
is_visited: AtomicBool,
}
impl NotRetry {
pub fn new() -> NotRetry {
info!(
"[NotRetry] return error response for the first time and return Ok() for reset times."
);
NotRetry {
is_visited: AtomicBool::new(false),
}
}
}
impl PdMocker for NotRetry {
fn get_region_by_id(&self, _: &GetRegionByIdRequest) -> Option<Result<GetRegionResponse>> {
if !self.is_visited.swap(true, Ordering::Relaxed) {
info!(
"[NotRetry] get_region_by_id returns Ok(_) with header has IncompatibleVersion error"
);
let mut err = Error::default();
err.set_type(ErrorType::IncompatibleVersion);
let mut resp = GetRegionResponse::default();
resp.mut_header().set_error(err);
Some(Ok(resp))
} else {
info!("[NotRetry] get_region_by_id returns Ok()");
Some(Ok(GetRegionResponse::default()))
}
}
fn get_store(&self, _: &GetStoreRequest) -> Option<Result<GetStoreResponse>> {
if !self.is_visited.swap(true, Ordering::Relaxed) {
info!(
"[NotRetry] get_region_by_id returns Ok(_) with header has IncompatibleVersion error"
);
let mut err = Error::default();
err.set_type(ErrorType::IncompatibleVersion);
let mut resp = GetStoreResponse::default();
resp.mut_header().set_error(err);
Some(Ok(resp))
} else {
info!("[NotRetry] get_region_by_id returns Ok()");
Some(Ok(GetStoreResponse::default()))
}
}
}