1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use coprocessor_plugin_api::*;
use kvproto::kvrpcpb;
use semver::VersionReq;
use std::future::Future;
use std::ops::Not;
use std::sync::Arc;
use super::config::Config;
use super::plugin_registry::PluginRegistry;
use super::raw_storage_impl::RawStorageImpl;
use crate::storage::{self, lock_manager::LockManager, Engine, Storage};
enum CoprocessorError {
RegionError(kvproto::errorpb::Error),
Other(String),
}
#[derive(Clone)]
pub struct Endpoint {
plugin_registry: Arc<PluginRegistry>,
}
impl tikv_util::AssertSend for Endpoint {}
impl Endpoint {
pub fn new(copr_cfg: &Config) -> Self {
let mut plugin_registry = PluginRegistry::new();
if let Some(plugin_directory) = &copr_cfg.coprocessor_plugin_directory {
let r = plugin_registry.start_hot_reloading(plugin_directory);
if let Err(err) = r {
warn!("unable to start hot-reloading for coprocessor plugins.";
"coprocessor_directory" => plugin_directory.display(),
"error" => ?err);
}
}
Self {
plugin_registry: Arc::new(plugin_registry),
}
}
#[inline]
pub fn handle_request<E: Engine, L: LockManager>(
&self,
storage: &Storage<E, L>,
req: kvrpcpb::RawCoprocessorRequest,
) -> impl Future<Output = kvrpcpb::RawCoprocessorResponse> {
let mut response = kvrpcpb::RawCoprocessorResponse::default();
let coprocessor_result = self.handle_request_impl(storage, req);
match coprocessor_result {
Ok(data) => response.set_data(data),
Err(CoprocessorError::RegionError(region_err)) => response.set_region_error(region_err),
Err(CoprocessorError::Other(o)) => response.set_error(o),
}
std::future::ready(response)
}
#[inline]
fn handle_request_impl<E: Engine, L: LockManager>(
&self,
storage: &Storage<E, L>,
mut req: kvrpcpb::RawCoprocessorRequest,
) -> Result<RawResponse, CoprocessorError> {
let plugin = self
.plugin_registry
.get_plugin(&req.copr_name)
.ok_or_else(|| {
CoprocessorError::Other(format!(
"No registered coprocessor with name '{}'",
req.copr_name
))
})?;
let version_req = VersionReq::parse(&req.copr_version_req)
.map_err(|e| CoprocessorError::Other(format!("{}", e)))?;
let plugin_version = plugin.version();
version_req
.matches(&plugin_version)
.not()
.then(|| {})
.ok_or_else(|| {
CoprocessorError::Other(format!(
"The plugin '{}' with version '{}' does not satisfy the version constraint '{}'",
plugin.name(),
plugin_version,
version_req,
))
})?;
let raw_storage_api = RawStorageImpl::new(req.take_context(), storage);
let ranges = req
.take_ranges()
.into_iter()
.map(|range| range.start_key..range.end_key)
.collect();
let plugin_result = plugin.on_raw_coprocessor_request(ranges, req.data, &raw_storage_api);
plugin_result.map_err(|err| {
if let Some(region_err) = extract_region_error(&err) {
CoprocessorError::RegionError(region_err)
} else {
CoprocessorError::Other(format!("{}", err))
}
})
}
}
fn extract_region_error(error: &PluginError) -> Option<kvproto::errorpb::Error> {
match error {
PluginError::Other(other_err) => other_err
.downcast_ref::<storage::Result<()>>()
.and_then(|e| storage::errors::extract_region_error::<()>(e)),
_ => None,
}
}