1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use super::Inner;
use crate::call::{BatchContext, MessageReader, RpcStatusCode};
use crate::error::Error;
#[derive(PartialEq, Debug)]
pub enum BatchType {
Finish,
Read,
CheckRead,
}
pub struct Batch {
ty: BatchType,
ctx: BatchContext,
inner: Arc<Inner<Option<MessageReader>>>,
}
impl Batch {
pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch {
Batch {
ty,
ctx: BatchContext::new(),
inner,
}
}
pub fn context(&self) -> &BatchContext {
&self.ctx
}
fn read_one_msg(&mut self, success: bool) {
let task = {
let mut guard = self.inner.lock();
if success {
guard.set_result(Ok(self.ctx.recv_message()))
} else {
guard.set_result(Ok(None))
}
};
task.map(|t| t.wake());
}
fn finish_response(&mut self, succeed: bool) {
let task = {
let mut guard = self.inner.lock();
if succeed {
let status = self.ctx.rpc_status();
if status.status == RpcStatusCode::OK {
guard.set_result(Ok(None))
} else {
guard.set_result(Err(Error::RpcFailure(status)))
}
} else {
guard.set_result(Err(Error::RemoteStopped))
}
};
task.map(|t| t.wake());
}
fn handle_unary_response(&mut self) {
let task = {
let mut guard = self.inner.lock();
let status = self.ctx.rpc_status();
if status.status == RpcStatusCode::OK {
guard.set_result(Ok(self.ctx.recv_message()))
} else {
guard.set_result(Err(Error::RpcFailure(status)))
}
};
task.map(|t| t.wake());
}
pub fn resolve(mut self, success: bool) {
match self.ty {
BatchType::CheckRead => {
assert!(success);
self.handle_unary_response();
}
BatchType::Finish => {
self.finish_response(success);
}
BatchType::Read => {
self.read_one_msg(success);
}
}
}
}
impl Debug for Batch {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "Batch [{:?}]", self.ty)
}
}
pub struct Action {
inner: Arc<Inner<bool>>,
}
impl Action {
pub fn new(inner: Arc<Inner<bool>>) -> Action {
Action { inner }
}
pub fn resolve(self, success: bool) {
let task = {
let mut guard = self.inner.lock();
guard.set_result(Ok(success))
};
task.map(|t| t.wake());
}
}