1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use super::range::Range;

#[derive(PartialEq, Eq, Clone, Debug)]
pub enum IterStatus {
    /// All ranges are consumed.
    Drained,

    /// Last range is drained or this iteration is a fresh start so that caller should scan
    /// on a new range.
    NewRange(Range),

    /// Last interval range is not drained and the caller should continue scanning without changing
    /// the scan range.
    Continue,
}

/// An iterator like structure that produces user key ranges.
///
/// For each `next()`, it produces one of the following:
/// - a new range
/// - a flag indicating continuing last interval range
/// - a flag indicating that all ranges are consumed
///
/// If a new range is returned, caller can then scan unknown amount of key(s) within this new range.
/// The caller must inform the structure so that it will emit a new range next time by calling
/// `notify_drained()` after current range is drained. Multiple `notify_drained()` without `next()`
/// will have no effect.
pub struct RangesIterator {
    /// Whether or not we are processing a valid range. If we are not processing a range, or there
    /// is no range any more, this field is `false`.
    in_range: bool,

    iter: std::vec::IntoIter<Range>,
}

impl RangesIterator {
    #[inline]
    pub fn new(user_key_ranges: Vec<Range>) -> Self {
        Self {
            in_range: false,
            iter: user_key_ranges.into_iter(),
        }
    }

    /// Continues iterating.
    #[inline]
    pub fn next(&mut self) -> IterStatus {
        if self.in_range {
            return IterStatus::Continue;
        }
        match self.iter.next() {
            None => IterStatus::Drained,
            Some(range) => {
                self.in_range = true;
                IterStatus::NewRange(range)
            }
        }
    }

    /// Notifies that current range is drained.
    #[inline]
    pub fn notify_drained(&mut self) {
        self.in_range = false;
    }
}

#[cfg(test)]
mod tests {
    use super::super::range::IntervalRange;
    use super::*;

    use std::sync::atomic;

    static RANGE_INDEX: atomic::AtomicU64 = atomic::AtomicU64::new(1);

    fn new_range() -> Range {
        use byteorder::{BigEndian, WriteBytesExt};

        let v = RANGE_INDEX.fetch_add(2, atomic::Ordering::SeqCst);
        let mut r = IntervalRange::from(("", ""));
        r.lower_inclusive.write_u64::<BigEndian>(v).unwrap();
        r.upper_exclusive.write_u64::<BigEndian>(v + 2).unwrap();
        Range::Interval(r)
    }

    #[test]
    fn test_basic() {
        // Empty
        let mut c = RangesIterator::new(vec![]);
        assert_eq!(c.next(), IterStatus::Drained);
        assert_eq!(c.next(), IterStatus::Drained);
        c.notify_drained();
        assert_eq!(c.next(), IterStatus::Drained);
        assert_eq!(c.next(), IterStatus::Drained);

        // Non-empty
        let ranges = vec![new_range(), new_range(), new_range()];
        let mut c = RangesIterator::new(ranges.clone());
        assert_eq!(c.next(), IterStatus::NewRange(ranges[0].clone()));
        assert_eq!(c.next(), IterStatus::Continue);
        assert_eq!(c.next(), IterStatus::Continue);
        c.notify_drained();
        assert_eq!(c.next(), IterStatus::NewRange(ranges[1].clone()));
        assert_eq!(c.next(), IterStatus::Continue);
        assert_eq!(c.next(), IterStatus::Continue);
        c.notify_drained();
        c.notify_drained(); // multiple consumes will not take effect
        assert_eq!(c.next(), IterStatus::NewRange(ranges[2].clone()));
        c.notify_drained();
        assert_eq!(c.next(), IterStatus::Drained);
        c.notify_drained();
        assert_eq!(c.next(), IterStatus::Drained);
    }
}