1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::vec::IntoIter;

use engine_traits::CfName;
use kvproto::metapb::Region;
use kvproto::pdpb::CheckPolicy;
use kvproto::raft_cmdpb::{AdminRequest, AdminResponse, RaftCmdRequest, RaftCmdResponse, Request};
use raft::{eraftpb, StateRole};

pub mod config;
mod consistency_check;
pub mod dispatcher;
mod error;
mod metrics;
pub mod region_info_accessor;
mod split_check;
pub mod split_observer;

pub use self::config::{Config, ConsistencyCheckMethod};
pub use self::consistency_check::{ConsistencyCheckObserver, Raw as RawConsistencyCheckObserver};
pub use self::dispatcher::{
    BoxAdminObserver, BoxApplySnapshotObserver, BoxCmdObserver, BoxConsistencyCheckObserver,
    BoxQueryObserver, BoxRegionChangeObserver, BoxRoleObserver, BoxSplitCheckObserver,
    CoprocessorHost, Registry,
};
pub use self::error::{Error, Result};
pub use self::region_info_accessor::{
    Callback as RegionInfoCallback, RegionCollector, RegionInfo, RegionInfoAccessor,
    RegionInfoProvider, SeekRegionCallback,
};
pub use self::split_check::{
    get_region_approximate_keys, get_region_approximate_middle, get_region_approximate_size,
    HalfCheckObserver, Host as SplitCheckerHost, KeysCheckObserver, SizeCheckObserver,
    TableCheckObserver,
};
pub use crate::store::KeyEntry;

/// Coprocessor is used to provide a convenient way to inject code to
/// KV processing.
pub trait Coprocessor: Send {
    fn start(&self) {}
    fn stop(&self) {}
}

/// Context of observer.
pub struct ObserverContext<'a> {
    region: &'a Region,
    /// Whether to bypass following observer hook.
    pub bypass: bool,
}

impl<'a> ObserverContext<'a> {
    pub fn new(region: &Region) -> ObserverContext<'_> {
        ObserverContext {
            region,
            bypass: false,
        }
    }

    pub fn region(&self) -> &Region {
        self.region
    }
}

pub trait AdminObserver: Coprocessor {
    /// Hook to call before proposing admin request.
    fn pre_propose_admin(&self, _: &mut ObserverContext<'_>, _: &mut AdminRequest) -> Result<()> {
        Ok(())
    }

    /// Hook to call before applying admin request.
    fn pre_apply_admin(&self, _: &mut ObserverContext<'_>, _: &AdminRequest) {}

    /// Hook to call after applying admin request.
    fn post_apply_admin(&self, _: &mut ObserverContext<'_>, _: &mut AdminResponse) {}
}

pub trait QueryObserver: Coprocessor {
    /// Hook to call before proposing write request.
    ///
    /// We don't propose read request, hence there is no hook for it yet.
    fn pre_propose_query(&self, _: &mut ObserverContext<'_>, _: &mut Vec<Request>) -> Result<()> {
        Ok(())
    }

    /// Hook to call before applying write request.
    fn pre_apply_query(&self, _: &mut ObserverContext<'_>, _: &[Request]) {}

    /// Hook to call after applying write request.
    fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &mut Cmd) {}
}

pub trait ApplySnapshotObserver: Coprocessor {
    /// Hook to call after applying key from plain file.
    /// This may be invoked multiple times for each plain file, and each time a batch of key-value
    /// pairs will be passed to the function.
    fn apply_plain_kvs(&self, _: &mut ObserverContext<'_>, _: CfName, _: &[(Vec<u8>, Vec<u8>)]) {}

    /// Hook to call after applying sst file. Currently the content of the snapshot can't be
    /// passed to the observer.
    fn apply_sst(&self, _: &mut ObserverContext<'_>, _: CfName, _path: &str) {}
}

/// SplitChecker is invoked during a split check scan, and decides to use
/// which keys to split a region.
pub trait SplitChecker<E> {
    /// Hook to call for every kv scanned during split.
    ///
    /// Return true to abort scan early.
    fn on_kv(&mut self, _: &mut ObserverContext<'_>, _: &KeyEntry) -> bool {
        false
    }

    /// Get the desired split keys.
    fn split_keys(&mut self) -> Vec<Vec<u8>>;

    /// Get approximate split keys without scan.
    fn approximate_split_keys(&mut self, _: &Region, _: &E) -> Result<Vec<Vec<u8>>> {
        Ok(vec![])
    }

    /// Get split policy.
    fn policy(&self) -> CheckPolicy;
}

pub trait SplitCheckObserver<E>: Coprocessor {
    /// Add a checker for a split scan.
    fn add_checker(
        &self,
        _: &mut ObserverContext<'_>,
        _: &mut SplitCheckerHost<'_, E>,
        _: &E,
        policy: CheckPolicy,
    );
}

pub trait RoleObserver: Coprocessor {
    /// Hook to call when role of a peer changes.
    ///
    /// Please note that, this hook is not called at realtime. There maybe a
    /// situation that the hook is not called yet, however the role of some peers
    /// have changed.
    fn on_role_change(&self, _: &mut ObserverContext<'_>, _: StateRole) {}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RegionChangeEvent {
    Create,
    Update,
    Destroy,
}

pub trait RegionChangeObserver: Coprocessor {
    /// Hook to call when a region changed on this TiKV
    fn on_region_changed(&self, _: &mut ObserverContext<'_>, _: RegionChangeEvent, _: StateRole) {}
}

#[derive(Clone, Debug)]
pub struct Cmd {
    pub index: u64,
    pub request: RaftCmdRequest,
    pub response: RaftCmdResponse,
}

impl Cmd {
    pub fn new(index: u64, request: RaftCmdRequest, response: RaftCmdResponse) -> Cmd {
        Cmd {
            index,
            request,
            response,
        }
    }
}

static OBSERVE_ID_ALLOC: AtomicUsize = AtomicUsize::new(0);

/// A unique identifier for checking stale observed commands.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct ObserveID(usize);

impl ObserveID {
    pub fn new() -> ObserveID {
        ObserveID(OBSERVE_ID_ALLOC.fetch_add(1, Ordering::SeqCst))
    }
}

/// ObserveHandle is the status of a term of observing, it contains the `ObserveID`
/// and the `observing` flag indicate whether the observing is ongoing
#[derive(Clone, Default, Debug)]
pub struct ObserveHandle {
    pub id: ObserveID,
    observing: Arc<AtomicBool>,
}

impl ObserveHandle {
    pub fn new() -> ObserveHandle {
        ObserveHandle {
            id: ObserveID::new(),
            observing: Arc::new(AtomicBool::new(true)),
        }
    }

    pub fn with_id(id: usize) -> ObserveHandle {
        ObserveHandle {
            id: ObserveID(id),
            observing: Arc::new(AtomicBool::new(true)),
        }
    }

    pub fn is_observing(&self) -> bool {
        self.observing.load(Ordering::Acquire)
    }

    pub fn stop_observing(&self) {
        self.observing.store(false, Ordering::Release)
    }
}

#[derive(Clone, Debug)]
pub struct CmdBatch {
    pub cdc_id: ObserveID,
    pub rts_id: ObserveID,
    pub region_id: u64,
    pub cmds: Vec<Cmd>,
}

impl CmdBatch {
    pub fn new(cdc_id: ObserveID, rts_id: ObserveID, region_id: u64) -> CmdBatch {
        CmdBatch {
            cdc_id,
            rts_id,
            region_id,
            cmds: Vec::new(),
        }
    }

    pub fn push(&mut self, cdc_id: ObserveID, rts_id: ObserveID, region_id: u64, cmd: Cmd) {
        assert_eq!(region_id, self.region_id);
        assert_eq!(cdc_id, self.cdc_id);
        assert_eq!(rts_id, self.rts_id);
        self.cmds.push(cmd)
    }

    pub fn into_iter(self, region_id: u64) -> IntoIter<Cmd> {
        assert_eq!(self.region_id, region_id);
        self.cmds.into_iter()
    }

    pub fn len(&self) -> usize {
        self.cmds.len()
    }

    pub fn is_empty(&self) -> bool {
        self.cmds.is_empty()
    }

    pub fn size(&self) -> usize {
        let mut cmd_bytes = 0;
        for cmd in self.cmds.iter() {
            let Cmd {
                ref request,
                ref response,
                ..
            } = cmd;
            if !response.get_header().has_error() && !request.has_admin_request() {
                for req in request.requests.iter() {
                    let put = req.get_put();
                    cmd_bytes += put.get_key().len();
                    cmd_bytes += put.get_value().len();
                }
            }
        }
        cmd_bytes
    }
}

pub trait CmdObserver<E>: Coprocessor {
    /// Hook to call after preparing for applying write requests.
    fn on_prepare_for_apply(&self, cdc_id: &ObserveHandle, rts_id: &ObserveHandle, region_id: u64);
    /// Hook to call after applying a write request.
    fn on_apply_cmd(&self, cdc_id: ObserveID, rts_id: ObserveID, region_id: u64, cmd: &Cmd);
    /// Hook to call after flushing writes to db.
    fn on_flush_apply(&self, engine: E);
}

pub trait ReadIndexObserver: Coprocessor {
    // Hook to call when stepping in raft and the message is a read index message.
    fn on_step(&self, _msg: &mut eraftpb::Message) {}
}