1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
// Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;

use kvproto::pdpb::*;
use pd_client::REQUEST_RECONNECT_INTERVAL;

use super::*;

#[derive(Debug)]
pub struct Retry {
    retry: usize,
    count: AtomicUsize,
}

impl Retry {
    pub fn new(retry: usize) -> Retry {
        info!("[Retry] return Ok(_) every {:?} times", retry);

        Retry {
            retry,
            count: AtomicUsize::new(0),
        }
    }

    fn is_ok(&self) -> bool {
        let count = self.count.fetch_add(1, Ordering::SeqCst);
        if count != 0 && count % self.retry == 0 {
            // it's ok.
            return true;
        }
        // let's sleep awhile, so that client will update its connection.
        thread::sleep(REQUEST_RECONNECT_INTERVAL);
        false
    }
}

impl PdMocker for Retry {
    fn get_region_by_id(&self, _: &GetRegionByIdRequest) -> Option<Result<GetRegionResponse>> {
        if self.is_ok() {
            info!("[Retry] get_region_by_id returns Ok(_)");
            Some(Ok(GetRegionResponse::default()))
        } else {
            info!("[Retry] get_region_by_id returns Err(_)");
            Some(Err("please retry".to_owned()))
        }
    }

    fn get_store(&self, _: &GetStoreRequest) -> Option<Result<GetStoreResponse>> {
        if self.is_ok() {
            info!("[Retry] get_store returns Ok(_)");
            Some(Ok(GetStoreResponse::default()))
        } else {
            info!("[Retry] get_store returns Err(_)");
            Some(Err("please retry".to_owned()))
        }
    }
}

#[derive(Debug)]
pub struct NotRetry {
    is_visited: AtomicBool,
}

impl NotRetry {
    pub fn new() -> NotRetry {
        info!(
            "[NotRetry] return error response for the first time and return Ok() for reset times."
        );

        NotRetry {
            is_visited: AtomicBool::new(false),
        }
    }
}

impl PdMocker for NotRetry {
    fn get_region_by_id(&self, _: &GetRegionByIdRequest) -> Option<Result<GetRegionResponse>> {
        if !self.is_visited.swap(true, Ordering::Relaxed) {
            info!(
                "[NotRetry] get_region_by_id returns Ok(_) with header has IncompatibleVersion error"
            );
            let mut err = Error::default();
            err.set_type(ErrorType::IncompatibleVersion);
            let mut resp = GetRegionResponse::default();
            resp.mut_header().set_error(err);
            Some(Ok(resp))
        } else {
            info!("[NotRetry] get_region_by_id returns Ok()");
            Some(Ok(GetRegionResponse::default()))
        }
    }

    fn get_store(&self, _: &GetStoreRequest) -> Option<Result<GetStoreResponse>> {
        if !self.is_visited.swap(true, Ordering::Relaxed) {
            info!(
                "[NotRetry] get_region_by_id returns Ok(_) with header has IncompatibleVersion error"
            );
            let mut err = Error::default();
            err.set_type(ErrorType::IncompatibleVersion);
            let mut resp = GetStoreResponse::default();
            resp.mut_header().set_error(err);
            Some(Ok(resp))
        } else {
            info!("[NotRetry] get_region_by_id returns Ok()");
            Some(Ok(GetStoreResponse::default()))
        }
    }
}