1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use std::sync::{Arc, Mutex};
use collections::HashMap;
use crossbeam::channel::TrySendError;
use engine_rocks::{RocksEngine, RocksSnapshot};
use kvproto::raft_serverpb::RaftMessage;
use raftstore::errors::{Error as RaftStoreError, Result as RaftStoreResult};
use raftstore::router::{handle_send_error, RaftStoreRouter};
use raftstore::store::msg::{CasualMessage, PeerMsg, SignificantMsg};
use raftstore::store::{CasualRouter, ProposalRouter, RaftCommand, StoreMsg, StoreRouter};
use tikv_util::mpsc::{loose_bounded, LooseBoundedSender, Receiver};
#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct MockRaftStoreRouter {
senders: Arc<Mutex<HashMap<u64, LooseBoundedSender<PeerMsg<RocksEngine>>>>>,
}
impl MockRaftStoreRouter {
pub fn new() -> MockRaftStoreRouter {
MockRaftStoreRouter {
senders: Arc::default(),
}
}
pub fn add_region(&self, region_id: u64, cap: usize) -> Receiver<PeerMsg<RocksEngine>> {
let (tx, rx) = loose_bounded(cap);
self.senders.lock().unwrap().insert(region_id, tx);
rx
}
}
impl StoreRouter<RocksEngine> for MockRaftStoreRouter {
fn send(&self, _: StoreMsg<RocksEngine>) -> RaftStoreResult<()> {
unimplemented!();
}
}
impl ProposalRouter<RocksSnapshot> for MockRaftStoreRouter {
fn send(
&self,
_: RaftCommand<RocksSnapshot>,
) -> std::result::Result<(), TrySendError<RaftCommand<RocksSnapshot>>> {
unimplemented!();
}
}
impl CasualRouter<RocksEngine> for MockRaftStoreRouter {
fn send(&self, region_id: u64, msg: CasualMessage<RocksEngine>) -> RaftStoreResult<()> {
let mut senders = self.senders.lock().unwrap();
if let Some(tx) = senders.get_mut(®ion_id) {
tx.try_send(PeerMsg::CasualMessage(msg))
.map_err(|e| handle_send_error(region_id, e))
} else {
Err(RaftStoreError::RegionNotFound(region_id))
}
}
}
impl RaftStoreRouter<RocksEngine> for MockRaftStoreRouter {
fn send_raft_msg(&self, _: RaftMessage) -> RaftStoreResult<()> {
unimplemented!()
}
fn significant_send(
&self,
region_id: u64,
msg: SignificantMsg<RocksSnapshot>,
) -> RaftStoreResult<()> {
let mut senders = self.senders.lock().unwrap();
if let Some(tx) = senders.get_mut(®ion_id) {
tx.force_send(PeerMsg::SignificantMsg(msg)).unwrap();
Ok(())
} else {
error!("failed to send significant msg"; "msg" => ?msg);
Err(RaftStoreError::RegionNotFound(region_id))
}
}
fn broadcast_normal(&self, _: impl FnMut() -> PeerMsg<RocksEngine>) {}
}