use pd_client::FeatureGate;
use std::cmp::Ordering;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::{mpsc, Arc};
use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
use std::time::{Duration, Instant};
use tikv_util::worker::FutureScheduler;
use txn_types::{Key, TimeStamp};
use crate::server::metrics::*;
use raftstore::coprocessor::RegionInfoProvider;
use raftstore::store::util::find_peer;
use super::config::GcWorkerConfigManager;
use super::gc_worker::{sync_gc, GcSafePointProvider, GcTask};
use super::{is_compaction_filter_allowed, Result};
const POLL_SAFE_POINT_INTERVAL_SECS: u64 = 10;
const BEGIN_KEY: &[u8] = b"";
const PROCESS_TYPE_GC: &str = "gc";
const PROCESS_TYPE_SCAN: &str = "scan";
pub struct AutoGcConfig<S: GcSafePointProvider, R: RegionInfoProvider> {
pub safe_point_provider: S,
pub region_info_provider: R,
pub self_store_id: u64,
pub poll_safe_point_interval: Duration,
pub always_check_safe_point: bool,
pub post_a_round_of_gc: Option<Box<dyn Fn() + Send>>,
}
impl<S: GcSafePointProvider, R: RegionInfoProvider> AutoGcConfig<S, R> {
pub fn new(safe_point_provider: S, region_info_provider: R, self_store_id: u64) -> Self {
Self {
safe_point_provider,
region_info_provider,
self_store_id,
poll_safe_point_interval: Duration::from_secs(POLL_SAFE_POINT_INTERVAL_SECS),
always_check_safe_point: false,
post_a_round_of_gc: None,
}
}
pub fn new_test_cfg(
safe_point_provider: S,
region_info_provider: R,
self_store_id: u64,
) -> Self {
Self {
safe_point_provider,
region_info_provider,
self_store_id,
poll_safe_point_interval: Duration::from_millis(100),
always_check_safe_point: true,
post_a_round_of_gc: None,
}
}
}
#[derive(Debug)]
enum GcManagerError {
Stopped,
}
type GcManagerResult<T> = std::result::Result<T, GcManagerError>;
pub(super) struct GcManagerContext {
stop_signal_receiver: Option<mpsc::Receiver<()>>,
is_stopped: bool,
}
impl GcManagerContext {
pub fn new() -> Self {
Self {
stop_signal_receiver: None,
is_stopped: false,
}
}
pub fn set_stop_signal_receiver(&mut self, rx: mpsc::Receiver<()>) {
self.stop_signal_receiver = Some(rx);
}
fn sleep_or_stop(&mut self, timeout: Duration) -> GcManagerResult<()> {
if self.is_stopped {
return Err(GcManagerError::Stopped);
}
match self.stop_signal_receiver.as_ref() {
Some(rx) => match rx.recv_timeout(timeout) {
Ok(_) => {
self.is_stopped = true;
Err(GcManagerError::Stopped)
}
Err(mpsc::RecvTimeoutError::Timeout) => Ok(()),
Err(mpsc::RecvTimeoutError::Disconnected) => {
panic!("stop_signal_receiver unexpectedly disconnected")
}
},
None => {
thread::sleep(timeout);
Ok(())
}
}
}
fn check_stopped(&mut self) -> GcManagerResult<()> {
if self.is_stopped {
return Err(GcManagerError::Stopped);
}
match self.stop_signal_receiver.as_ref() {
Some(rx) => match rx.try_recv() {
Ok(_) => {
self.is_stopped = true;
Err(GcManagerError::Stopped)
}
Err(mpsc::TryRecvError::Empty) => Ok(()),
Err(mpsc::TryRecvError::Disconnected) => {
error!("stop_signal_receiver unexpectedly disconnected, gc_manager will stop");
Err(GcManagerError::Stopped)
}
},
None => Ok(()),
}
}
}
#[derive(PartialEq)]
enum GcManagerState {
None,
Init,
Idle,
Working,
}
impl GcManagerState {
pub fn tag(&self) -> &str {
match self {
GcManagerState::None => "",
GcManagerState::Init => "initializing",
GcManagerState::Idle => "idle",
GcManagerState::Working => "working",
}
}
}
#[inline]
fn set_status_metrics(state: GcManagerState) {
for s in &[
GcManagerState::Init,
GcManagerState::Idle,
GcManagerState::Working,
] {
AUTO_GC_STATUS_GAUGE_VEC
.with_label_values(&[s.tag()])
.set(if state == *s { 1 } else { 0 });
}
}
pub(super) struct GcManagerHandle {
join_handle: JoinHandle<()>,
stop_signal_sender: mpsc::Sender<()>,
}
impl GcManagerHandle {
pub fn stop(self) -> Result<()> {
let res: Result<()> = self
.stop_signal_sender
.send(())
.map_err(|e| box_err!("failed to send stop signal to gc worker thread: {:?}", e));
if res.is_err() {
return res;
}
self.join_handle
.join()
.map_err(|e| box_err!("failed to join gc worker thread: {:?}", e))
}
}
pub(super) struct GcManager<S: GcSafePointProvider, R: RegionInfoProvider> {
cfg: AutoGcConfig<S, R>,
safe_point: Arc<AtomicU64>,
safe_point_last_check_time: Instant,
worker_scheduler: FutureScheduler<GcTask>,
gc_manager_ctx: GcManagerContext,
cfg_tracker: GcWorkerConfigManager,
feature_gate: FeatureGate,
}
impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static> GcManager<S, R> {
pub fn new(
cfg: AutoGcConfig<S, R>,
safe_point: Arc<AtomicU64>,
worker_scheduler: FutureScheduler<GcTask>,
cfg_tracker: GcWorkerConfigManager,
feature_gate: FeatureGate,
) -> GcManager<S, R> {
GcManager {
cfg,
safe_point,
safe_point_last_check_time: Instant::now(),
worker_scheduler,
gc_manager_ctx: GcManagerContext::new(),
cfg_tracker,
feature_gate,
}
}
fn curr_safe_point(&self) -> TimeStamp {
let ts = self.safe_point.load(AtomicOrdering::Relaxed);
TimeStamp::new(ts)
}
fn save_safe_point(&self, ts: TimeStamp) {
self.safe_point
.store(ts.into_inner(), AtomicOrdering::Relaxed);
}
pub fn start(mut self) -> Result<GcManagerHandle> {
set_status_metrics(GcManagerState::Init);
self.initialize();
let (tx, rx) = mpsc::channel();
self.gc_manager_ctx.set_stop_signal_receiver(rx);
let res: Result<_> = ThreadBuilder::new()
.name(thd_name!("gc-manager"))
.spawn(move || {
tikv_alloc::add_thread_memory_accessor();
self.run();
tikv_alloc::remove_thread_memory_accessor();
})
.map_err(|e| box_err!("failed to start gc manager: {:?}", e));
res.map(|join_handle| GcManagerHandle {
join_handle,
stop_signal_sender: tx,
})
}
fn run(&mut self) {
debug!("gc-manager is started");
self.run_impl().unwrap_err();
set_status_metrics(GcManagerState::None);
debug!("gc-manager is stopped");
}
fn run_impl(&mut self) -> GcManagerResult<()> {
loop {
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
.set(0);
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_SCAN])
.set(0);
set_status_metrics(GcManagerState::Idle);
self.wait_for_next_safe_point()?;
if !is_compaction_filter_allowed(&*self.cfg_tracker.value(), &self.feature_gate) {
set_status_metrics(GcManagerState::Working);
self.gc_a_round()?;
if let Some(on_finished) = self.cfg.post_a_round_of_gc.as_ref() {
on_finished();
}
}
}
}
fn initialize(&mut self) {
debug!("gc-manager is initializing");
self.save_safe_point(TimeStamp::zero());
self.try_update_safe_point();
debug!("gc-manager started"; "safe_point" => self.curr_safe_point());
}
fn wait_for_next_safe_point(&mut self) -> GcManagerResult<TimeStamp> {
loop {
if self.try_update_safe_point() {
return Ok(self.curr_safe_point());
}
self.gc_manager_ctx
.sleep_or_stop(self.cfg.poll_safe_point_interval)?;
}
}
fn try_update_safe_point(&mut self) -> bool {
self.safe_point_last_check_time = Instant::now();
let safe_point = match self.cfg.safe_point_provider.get_safe_point() {
Ok(res) => res,
Err(e) => {
error!(?e; "failed to get safe point from pd");
return false;
}
};
let old_safe_point = self.curr_safe_point();
match safe_point.cmp(&old_safe_point) {
Ordering::Less => {
panic!(
"got new safe point {} which is less than current safe point {}. \
there must be something wrong.",
safe_point, old_safe_point,
);
}
Ordering::Equal => false,
Ordering::Greater => {
debug!("gc_worker: update safe point"; "safe_point" => safe_point);
self.save_safe_point(safe_point);
AUTO_GC_SAFE_POINT_GAUGE.set(safe_point.into_inner() as i64);
true
}
}
}
fn gc_a_round(&mut self) -> GcManagerResult<()> {
let mut need_rewind = false;
let mut end = None;
let mut progress = Some(Key::from_encoded(BEGIN_KEY.to_vec()));
let mut processed_regions = 0;
info!("gc_worker: auto gc starts"; "safe_point" => self.curr_safe_point());
loop {
self.gc_manager_ctx.check_stopped()?;
if is_compaction_filter_allowed(&*self.cfg_tracker.value(), &self.feature_gate) {
return Ok(());
}
if need_rewind {
if progress.is_none() {
progress = Some(Key::from_encoded(BEGIN_KEY.to_vec()));
need_rewind = false;
info!("gc_worker: auto gc rewinds"; "processed_regions" => processed_regions);
processed_regions = 0;
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
.set(0);
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_SCAN])
.set(0);
}
} else {
let finished = match (progress.as_ref(), end.as_ref()) {
(None, _) => true,
(Some(p), Some(e)) => p >= e,
_ => false,
};
if finished {
info!("gc_worker: auto gc finishes"; "processed_regions" => processed_regions);
return Ok(());
}
}
assert!(progress.is_some());
self.check_if_need_rewind(&progress, &mut need_rewind, &mut end);
progress = self.gc_next_region(progress.unwrap(), &mut processed_regions)?;
}
}
fn check_if_need_rewind(
&mut self,
progress: &Option<Key>,
need_rewind: &mut bool,
end: &mut Option<Key>,
) {
if self.safe_point_last_check_time.elapsed() < self.cfg.poll_safe_point_interval
&& !self.cfg.always_check_safe_point
{
return;
}
if !self.try_update_safe_point() {
return;
}
if progress.as_ref().unwrap().as_encoded().is_empty() {
*need_rewind = false;
*end = None;
info!(
"gc_worker: auto gc will go to the end"; "safe_point" => self.curr_safe_point()
);
} else {
*need_rewind = true;
*end = progress.clone();
info!(
"gc_worker: auto gc will go to rewind"; "safe_point" => self.curr_safe_point(),
"next_rewind_key" => %(end.as_ref().unwrap())
);
}
}
fn gc_next_region(
&mut self,
from_key: Key,
processed_regions: &mut usize,
) -> GcManagerResult<Option<Key>> {
let (range, next_key) = self.get_next_gc_context(from_key);
let (region_id, start, end) = match range {
Some((r, s, e)) => (r, s, e),
None => return Ok(None),
};
let hex_start = format!("{:?}", log_wrappers::Value::key(&start));
let hex_end = format!("{:?}", log_wrappers::Value::key(&end));
debug!("trying gc"; "start_key" => &hex_start, "end_key" => &hex_end);
if let Err(e) = sync_gc(
&self.worker_scheduler,
region_id,
start,
end,
self.curr_safe_point(),
) {
warn!("failed gc"; "start_key" => &hex_start, "end_key" => &hex_end, "err" => ?e);
}
*processed_regions += 1;
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
.inc();
Ok(next_key)
}
#[allow(clippy::type_complexity)]
fn get_next_gc_context(&mut self, key: Key) -> (Option<(u64, Vec<u8>, Vec<u8>)>, Option<Key>) {
let (tx, rx) = mpsc::channel();
let store_id = self.cfg.self_store_id;
let res = self.cfg.region_info_provider.seek_region(
key.as_encoded(),
Box::new(move |iter| {
let mut scanned_regions = 0;
for info in iter {
scanned_regions += 1;
if find_peer(&info.region, store_id).is_some() {
let _ = tx.send((Some(info.region.clone()), scanned_regions));
return;
}
}
let _ = tx.send((None, scanned_regions));
}),
);
if let Err(e) = res {
error!(?e; "gc_worker: failed to get next region information");
return (None, None);
};
let seek_region_res = rx.recv().map(|(region, scanned_regions)| {
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_SCAN])
.add(scanned_regions);
region
});
match seek_region_res {
Ok(Some(mut region)) => {
let r = region.get_id();
let (s, e) = (region.take_start_key(), region.take_end_key());
let next_key = if e.is_empty() {
None
} else {
Some(Key::from_encoded_slice(&e))
};
(Some((r, s, e)), next_key)
}
Ok(None) => (None, None),
Err(e) => {
error!("failed to get next region information"; "err" => ?e);
(None, None)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::Callback;
use kvproto::metapb;
use raft::StateRole;
use raftstore::coprocessor::Result as CopResult;
use raftstore::coprocessor::{RegionInfo, SeekRegionCallback};
use raftstore::store::util::new_peer;
use std::collections::BTreeMap;
use std::mem;
use std::sync::mpsc::{channel, Receiver, Sender};
use tikv_util::worker::{FutureRunnable, FutureWorker};
fn take_callback(t: &mut GcTask) -> Callback<()> {
let callback = match t {
GcTask::Gc {
ref mut callback, ..
} => callback,
GcTask::UnsafeDestroyRange {
ref mut callback, ..
} => callback,
GcTask::GcKeys { .. } => unreachable!(),
GcTask::PhysicalScanLock { .. } => unreachable!(),
GcTask::OrphanVersions { .. } => unreachable!(),
GcTask::Validate(_) => unreachable!(),
};
mem::replace(callback, Box::new(|_| {}))
}
struct MockSafePointProvider {
rx: Receiver<TimeStamp>,
}
impl GcSafePointProvider for MockSafePointProvider {
fn get_safe_point(&self) -> Result<TimeStamp> {
self.rx.try_recv().map_err(|e| box_err!(e))
}
}
#[derive(Clone)]
struct MockRegionInfoProvider {
regions: BTreeMap<Vec<u8>, RegionInfo>,
}
impl RegionInfoProvider for MockRegionInfoProvider {
fn seek_region(&self, from: &[u8], callback: SeekRegionCallback) -> CopResult<()> {
let from = from.to_vec();
callback(&mut self.regions.range(from..).map(|(_, v)| v));
Ok(())
}
}
struct MockGcRunner {
tx: Sender<GcTask>,
}
impl FutureRunnable<GcTask> for MockGcRunner {
fn run(&mut self, mut t: GcTask) {
let cb = take_callback(&mut t);
self.tx.send(t).unwrap();
cb(Ok(()));
}
}
struct GcManagerTestUtil {
gc_manager: Option<GcManager<MockSafePointProvider, MockRegionInfoProvider>>,
worker: FutureWorker<GcTask>,
safe_point_sender: Sender<TimeStamp>,
gc_task_receiver: Receiver<GcTask>,
}
impl GcManagerTestUtil {
pub fn new(regions: BTreeMap<Vec<u8>, RegionInfo>) -> Self {
let mut worker = FutureWorker::new("test-gc-worker");
let (gc_task_sender, gc_task_receiver) = channel();
worker.start(MockGcRunner { tx: gc_task_sender }).unwrap();
let (safe_point_sender, safe_point_receiver) = channel();
let mut cfg = AutoGcConfig::new(
MockSafePointProvider {
rx: safe_point_receiver,
},
MockRegionInfoProvider { regions },
1,
);
cfg.poll_safe_point_interval = Duration::from_millis(100);
cfg.always_check_safe_point = true;
let gc_manager = GcManager::new(
cfg,
Arc::new(AtomicU64::new(0)),
worker.scheduler(),
GcWorkerConfigManager::default(),
Default::default(),
);
Self {
gc_manager: Some(gc_manager),
worker,
safe_point_sender,
gc_task_receiver,
}
}
pub fn collect_scheduled_tasks(&self) -> Vec<GcTask> {
self.gc_task_receiver.try_iter().collect()
}
pub fn add_next_safe_point(&self, safe_point: impl Into<TimeStamp>) {
self.safe_point_sender.send(safe_point.into()).unwrap();
}
pub fn stop(&mut self) {
self.worker.stop().unwrap().join().unwrap();
}
}
fn test_auto_gc(
regions: Vec<(Vec<u8>, Vec<u8>, u64)>,
safe_points: Vec<impl Into<TimeStamp> + Copy>,
expected_gc_tasks: Vec<(u64, impl Into<TimeStamp>)>,
) {
let regions: BTreeMap<_, _> = regions
.into_iter()
.map(|(start_key, end_key, id)| {
let mut r = metapb::Region::default();
r.set_id(id);
r.set_start_key(start_key.clone());
r.set_end_key(end_key);
r.mut_peers().push(new_peer(1, 1));
let info = RegionInfo::new(r, StateRole::Leader);
(start_key, info)
})
.collect();
let mut test_util = GcManagerTestUtil::new(regions);
for safe_point in &safe_points {
test_util.add_next_safe_point(*safe_point);
}
test_util.gc_manager.as_mut().unwrap().initialize();
test_util.gc_manager.as_mut().unwrap().gc_a_round().unwrap();
test_util.stop();
let gc_tasks: Vec<_> = test_util
.collect_scheduled_tasks()
.iter()
.map(|task| match task {
GcTask::Gc {
region_id,
safe_point,
..
} => (*region_id, *safe_point),
_ => unreachable!(),
})
.collect();
assert_eq!(gc_tasks.len(), expected_gc_tasks.len());
let all_passed = gc_tasks.into_iter().zip(expected_gc_tasks.into_iter()).all(
|((region, safe_point), (expect_region, expect_safe_point))| {
region == expect_region && safe_point == expect_safe_point.into()
},
);
assert!(all_passed);
}
#[test]
fn test_update_safe_point() {
let mut test_util = GcManagerTestUtil::new(BTreeMap::new());
let mut gc_manager = test_util.gc_manager.take().unwrap();
assert_eq!(gc_manager.curr_safe_point(), TimeStamp::zero());
test_util.add_next_safe_point(233);
assert!(gc_manager.try_update_safe_point());
assert_eq!(gc_manager.curr_safe_point(), 233.into());
let (tx, rx) = channel();
ThreadBuilder::new()
.spawn(move || {
let safe_point = gc_manager.wait_for_next_safe_point().unwrap();
tx.send(safe_point).unwrap();
})
.unwrap();
test_util.add_next_safe_point(233);
test_util.add_next_safe_point(233);
test_util.add_next_safe_point(234);
assert_eq!(rx.recv().unwrap(), 234.into());
test_util.stop();
}
#[test]
fn test_gc_manager_initialize() {
let mut test_util = GcManagerTestUtil::new(BTreeMap::new());
let mut gc_manager = test_util.gc_manager.take().unwrap();
assert_eq!(gc_manager.curr_safe_point(), TimeStamp::zero());
test_util.add_next_safe_point(0);
test_util.add_next_safe_point(5);
gc_manager.initialize();
assert_eq!(gc_manager.curr_safe_point(), TimeStamp::zero());
assert!(gc_manager.try_update_safe_point());
assert_eq!(gc_manager.curr_safe_point(), 5.into());
}
#[test]
fn test_auto_gc_a_round_without_rewind() {
let regions = vec![
(b"".to_vec(), b"1".to_vec(), 1),
(b"1".to_vec(), b"2".to_vec(), 2),
(b"3".to_vec(), b"4".to_vec(), 3),
(b"7".to_vec(), b"".to_vec(), 4),
];
test_auto_gc(
regions,
vec![233],
vec![(1, 233), (2, 233), (3, 233), (4, 233)],
);
let regions = vec![
(b"0".to_vec(), b"1".to_vec(), 1),
(b"1".to_vec(), b"2".to_vec(), 2),
(b"3".to_vec(), b"4".to_vec(), 3),
(b"7".to_vec(), b"8".to_vec(), 4),
];
test_auto_gc(
regions,
vec![233],
vec![(1, 233), (2, 233), (3, 233), (4, 233)],
);
}
#[test]
fn test_auto_gc_rewinding() {
for regions in vec![
vec![
(b"".to_vec(), b"1".to_vec(), 1),
(b"1".to_vec(), b"2".to_vec(), 2),
(b"3".to_vec(), b"4".to_vec(), 3),
(b"7".to_vec(), b"".to_vec(), 4),
],
vec![
(b"0".to_vec(), b"1".to_vec(), 1),
(b"1".to_vec(), b"2".to_vec(), 2),
(b"3".to_vec(), b"4".to_vec(), 3),
(b"7".to_vec(), b"8".to_vec(), 4),
],
] {
test_auto_gc(
regions.clone(),
vec![233, 234],
vec![(1, 234), (2, 234), (3, 234), (4, 234)],
);
test_auto_gc(
regions.clone(),
vec![233, 233, 234],
vec![(1, 233), (2, 234), (3, 234), (4, 234), (1, 234)],
);
test_auto_gc(
regions.clone(),
vec![233, 233, 233, 233, 234],
vec![
(1, 233),
(2, 233),
(3, 233),
(4, 234),
(1, 234),
(2, 234),
(3, 234),
],
);
test_auto_gc(
regions.clone(),
vec![233, 233, 233, 234, 235],
vec![
(1, 233),
(2, 233),
(3, 234),
(4, 235),
(1, 235),
(2, 235),
(3, 235),
],
);
let mut safe_points = vec![233, 233, 233, 234, 234, 234, 235];
if !regions.last().unwrap().1.is_empty() {
safe_points.insert(5, 234);
}
test_auto_gc(
regions.clone(),
safe_points,
vec![
(1, 233),
(2, 233),
(3, 234),
(4, 234),
(1, 234),
(2, 235),
(3, 235),
(4, 235),
(1, 235),
],
);
}
}
}