1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::{Arc, Mutex};

use collections::HashMap;
use crossbeam::channel::TrySendError;
use engine_rocks::{RocksEngine, RocksSnapshot};
use kvproto::raft_serverpb::RaftMessage;
use raftstore::errors::{Error as RaftStoreError, Result as RaftStoreResult};
use raftstore::router::{handle_send_error, RaftStoreRouter};
use raftstore::store::msg::{CasualMessage, PeerMsg, SignificantMsg};
use raftstore::store::{CasualRouter, ProposalRouter, RaftCommand, StoreMsg, StoreRouter};
use tikv_util::mpsc::{loose_bounded, LooseBoundedSender, Receiver};

#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct MockRaftStoreRouter {
    senders: Arc<Mutex<HashMap<u64, LooseBoundedSender<PeerMsg<RocksEngine>>>>>,
}

impl MockRaftStoreRouter {
    pub fn new() -> MockRaftStoreRouter {
        MockRaftStoreRouter {
            senders: Arc::default(),
        }
    }
    pub fn add_region(&self, region_id: u64, cap: usize) -> Receiver<PeerMsg<RocksEngine>> {
        let (tx, rx) = loose_bounded(cap);
        self.senders.lock().unwrap().insert(region_id, tx);
        rx
    }
}

impl StoreRouter<RocksEngine> for MockRaftStoreRouter {
    fn send(&self, _: StoreMsg<RocksEngine>) -> RaftStoreResult<()> {
        unimplemented!();
    }
}

impl ProposalRouter<RocksSnapshot> for MockRaftStoreRouter {
    fn send(
        &self,
        _: RaftCommand<RocksSnapshot>,
    ) -> std::result::Result<(), TrySendError<RaftCommand<RocksSnapshot>>> {
        unimplemented!();
    }
}

impl CasualRouter<RocksEngine> for MockRaftStoreRouter {
    fn send(&self, region_id: u64, msg: CasualMessage<RocksEngine>) -> RaftStoreResult<()> {
        let mut senders = self.senders.lock().unwrap();
        if let Some(tx) = senders.get_mut(&region_id) {
            tx.try_send(PeerMsg::CasualMessage(msg))
                .map_err(|e| handle_send_error(region_id, e))
        } else {
            Err(RaftStoreError::RegionNotFound(region_id))
        }
    }
}

impl RaftStoreRouter<RocksEngine> for MockRaftStoreRouter {
    fn send_raft_msg(&self, _: RaftMessage) -> RaftStoreResult<()> {
        unimplemented!()
    }

    fn significant_send(
        &self,
        region_id: u64,
        msg: SignificantMsg<RocksSnapshot>,
    ) -> RaftStoreResult<()> {
        let mut senders = self.senders.lock().unwrap();
        if let Some(tx) = senders.get_mut(&region_id) {
            tx.force_send(PeerMsg::SignificantMsg(msg)).unwrap();
            Ok(())
        } else {
            error!("failed to send significant msg"; "msg" => ?msg);
            Err(RaftStoreError::RegionNotFound(region_id))
        }
    }

    fn broadcast_normal(&self, _: impl FnMut() -> PeerMsg<RocksEngine>) {}
}