1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use super::{Error, Result};
use futures::channel::mpsc::{self, UnboundedSender};
use futures::future::{self, BoxFuture};
use futures::sink::SinkExt;
use futures::stream::{StreamExt, TryStreamExt};
use grpcio::{ChannelBuilder, EnvBuilder, Environment, WriteFlags};
use kvproto::deadlock::*;
use security::SecurityManager;
use std::sync::Arc;
use std::time::Duration;

type DeadlockFuture<T> = BoxFuture<'static, Result<T>>;

pub type Callback = Box<dyn Fn(DeadlockResponse) + Send>;

const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "deadlock";

/// Builds the `Environment` of deadlock clients. All clients should use the same instance.
pub fn env() -> Arc<Environment> {
    Arc::new(
        EnvBuilder::new()
            .cq_count(CQ_COUNT)
            .name_prefix(thd_name!(CLIENT_PREFIX))
            .build(),
    )
}

#[derive(Clone)]
pub struct Client {
    addr: String,
    client: DeadlockClient,
    sender: Option<UnboundedSender<DeadlockRequest>>,
}

impl Client {
    pub fn new(env: Arc<Environment>, security_mgr: Arc<SecurityManager>, addr: &str) -> Self {
        let cb = ChannelBuilder::new(env)
            .keepalive_time(Duration::from_secs(10))
            .keepalive_timeout(Duration::from_secs(3));
        let channel = security_mgr.connect(cb, addr);
        let client = DeadlockClient::new(channel);
        Self {
            addr: addr.to_owned(),
            client,
            sender: None,
        }
    }

    pub fn register_detect_handler(
        &mut self,
        cb: Callback,
    ) -> (DeadlockFuture<()>, DeadlockFuture<()>) {
        let (tx, rx) = mpsc::unbounded();
        let (sink, receiver) = self.client.detect().unwrap();
        let send_task = Box::pin(async move {
            let mut sink = sink.sink_map_err(Error::Grpc);
            let res = sink
                .send_all(&mut rx.map(|r| Ok((r, WriteFlags::default()))))
                .await
                .map(|_| {
                    info!("cancel detect sender");
                    sink.get_mut().cancel();
                });
            res
        });
        self.sender = Some(tx);

        let recv_task = Box::pin(receiver.map_err(Error::Grpc).try_for_each(move |resp| {
            cb(resp);
            future::ok(())
        }));

        (send_task, recv_task)
    }

    pub fn detect(&self, req: DeadlockRequest) -> Result<()> {
        self.sender
            .as_ref()
            .unwrap()
            .unbounded_send(req)
            .map_err(|e| Error::Other(box_err!(e)))
    }
}