use std::collections::VecDeque;
use std::marker::PhantomData;
use std::sync::Arc;
use std::{cmp, u64};
use protobuf::Message;
use crate::cache_evict::CacheTracker;
use crate::log_batch::{CompressionType, EntryExt};
use crate::pipe_log::LogQueue;
use crate::util::{slices_in_range, HashMap};
use crate::{Error, GlobalStats, Result};
const SHRINK_CACHE_CAPACITY: usize = 64;
const SHRINK_CACHE_LIMIT: usize = 512;
#[derive(Debug, Clone, PartialEq)]
pub struct EntryIndex {
pub index: u64,
pub queue: LogQueue,
pub file_num: u64,
pub base_offset: u64,
pub compression_type: CompressionType,
pub batch_len: u64,
pub offset: u64,
pub len: u64,
pub cache_tracker: Option<CacheTracker>,
}
impl Default for EntryIndex {
fn default() -> EntryIndex {
EntryIndex {
index: 0,
queue: LogQueue::Append,
file_num: 0,
base_offset: 0,
compression_type: CompressionType::None,
batch_len: 0,
offset: 0,
len: 0,
cache_tracker: None,
}
}
}
pub struct MemTable<E: Message + Clone, W: EntryExt<E>> {
region_id: u64,
entries_cache: VecDeque<E>,
entries_index: VecDeque<EntryIndex>,
rewrite_count: usize,
kvs: HashMap<Vec<u8>, (Vec<u8>, LogQueue, u64)>,
global_stats: Arc<GlobalStats>,
_phantom: PhantomData<W>,
}
impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
fn cache_distance(&self) -> usize {
if self.entries_cache.is_empty() {
return self.entries_index.len();
}
self.entries_index.len() - self.entries_cache.len()
}
fn cut_entries_cache(&mut self, index: u64) {
if self.entries_cache.is_empty() {
return;
}
let last_index = W::index(self.entries_cache.back().unwrap());
let first_index = W::index(self.entries_cache.front().unwrap());
let conflict = if index <= first_index {
0
} else if index <= last_index {
(index - first_index) as usize
} else {
return;
};
let distance = self.cache_distance();
for offset in conflict..self.entries_cache.len() {
let entry_index = &mut self.entries_index[distance + offset];
entry_index.cache_tracker.take();
}
self.entries_cache.truncate(conflict);
}
fn cut_entries_index(&mut self, index: u64) -> usize {
if self.entries_index.is_empty() {
return 0;
}
let last_index = self.entries_index.back().unwrap().index;
let first_index = self.entries_index.front().unwrap().index;
assert!(first_index <= index, "corrupted raft {}", self.region_id);
let conflict = if index <= last_index {
(index - first_index) as usize
} else {
return 0;
};
self.entries_index.truncate(conflict);
if self.rewrite_count > self.entries_index.len() {
let diff = self.rewrite_count - self.entries_index.len();
self.rewrite_count = self.entries_index.len();
self.global_stats.add_compacted_rewrite(diff);
}
(last_index - index + 1) as usize
}
fn shrink_entries_cache(&mut self) {
if self.entries_cache.capacity() > SHRINK_CACHE_LIMIT
&& self.entries_cache.len() <= SHRINK_CACHE_CAPACITY
{
self.entries_cache.shrink_to(SHRINK_CACHE_CAPACITY);
}
}
fn shrink_entries_index(&mut self) {
if self.entries_index.capacity() > SHRINK_CACHE_LIMIT
&& self.entries_index.len() <= SHRINK_CACHE_CAPACITY
{
self.entries_index.shrink_to(SHRINK_CACHE_CAPACITY);
}
}
fn merge_higher_prio(&mut self, rhs: &mut Self) {
debug_assert_eq!(self.rewrite_count, self.entries_index.len());
debug_assert!(self.entries_cache.is_empty());
debug_assert_eq!(rhs.rewrite_count, 0);
if !rhs.entries_index.is_empty() {
if !self.entries_index.is_empty() {
let front = rhs.entries_index[0].index;
let self_back = self.entries_index.back().unwrap().index;
let self_front = self.entries_index.front().unwrap().index;
if front > self_back + 1 {
self.compact_to(self_back + 1);
} else if front >= self_front {
self.cut_entries_index(front);
} else {
unreachable!();
}
}
self.entries_index.reserve(rhs.entries_index.len());
for ei in std::mem::take(&mut rhs.entries_index) {
self.entries_index.push_back(ei);
}
self.entries_cache = std::mem::take(&mut rhs.entries_cache);
}
for (key, (value, queue, file_num)) in std::mem::take(&mut rhs.kvs) {
self.kvs.insert(key, (value, queue, file_num));
}
}
pub fn merge_lower_prio(&mut self, rhs: &mut Self) {
rhs.merge_higher_prio(self);
std::mem::swap(self, rhs);
}
pub fn new(region_id: u64, global_stats: Arc<GlobalStats>) -> MemTable<E, W> {
MemTable::<E, W> {
region_id,
entries_cache: VecDeque::with_capacity(SHRINK_CACHE_CAPACITY),
entries_index: VecDeque::with_capacity(SHRINK_CACHE_CAPACITY),
rewrite_count: 0,
kvs: HashMap::default(),
global_stats,
_phantom: PhantomData,
}
}
pub fn append(&mut self, entries: Vec<E>, entries_index: Vec<EntryIndex>) {
assert_eq!(entries.len(), entries_index.len());
if entries.is_empty() {
return;
}
let first_index_to_add = W::index(&entries[0]);
self.cut_entries_cache(first_index_to_add);
self.cut_entries_index(first_index_to_add);
if let Some(index) = self.entries_index.back().map(|e| e.index) {
assert_eq!(
index + 1,
first_index_to_add,
"memtable {} has a hole",
self.region_id
);
}
if entries_index.first().unwrap().cache_tracker.is_some() {
self.entries_cache.extend(entries);
}
self.entries_index.extend(entries_index);
}
pub fn append_rewrite(&mut self, entries: Vec<E>, entries_index: Vec<EntryIndex>) {
self.global_stats.add_rewrite(entries.len());
match (
self.entries_index.back().map(|x| x.index),
entries_index.first(),
) {
(Some(back_idx), Some(first)) if back_idx + 1 < first.index => {
self.compact_to(back_idx + 1);
}
_ => {}
}
self.append(entries, entries_index);
self.rewrite_count = self.entries_index.len();
}
pub fn rewrite(&mut self, entries_index: Vec<EntryIndex>, latest_rewrite: Option<u64>) {
if entries_index.is_empty() {
return;
}
self.global_stats.add_rewrite(entries_index.len());
if self.entries_index.is_empty() {
self.global_stats.add_compacted_rewrite(entries_index.len());
return;
}
let self_ents_len = self.entries_index.len();
let front = self.entries_index[0].index as usize;
let back = self.entries_index[self_ents_len - 1].index as usize;
let ents_len = entries_index.len();
let first = cmp::max(entries_index[0].index as usize, front);
let last = cmp::min(entries_index[ents_len - 1].index as usize, back);
if last < front {
self.global_stats.add_compacted_rewrite(entries_index.len());
return;
}
let strip_count = ents_len - (last + 1 - first);
self.global_stats.add_compacted_rewrite(strip_count);
let (entries_index, ents_len) = {
let diff = first - entries_index[0].index as usize;
let len = last - first + 1;
(&entries_index[diff..diff + len], len)
};
let distance = (first - front) as usize;
for (i, ei) in entries_index.iter().enumerate() {
if let Some(latest_rewrite) = latest_rewrite {
debug_assert_eq!(self.entries_index[i + distance].queue, LogQueue::Append);
if self.entries_index[i + distance].file_num > latest_rewrite {
self.compact_cache_to(self.entries_index[i + distance].index);
self.global_stats.add_compacted_rewrite(ents_len - i);
self.rewrite_count = i + distance;
return;
}
} else {
debug_assert_eq!(self.entries_index[i + distance].queue, LogQueue::Rewrite);
}
if self.entries_index[i + distance].queue != LogQueue::Rewrite {
debug_assert_eq!(ei.queue, LogQueue::Rewrite);
self.entries_index[i + distance].queue = LogQueue::Rewrite;
}
self.entries_index[i + distance].file_num = ei.file_num;
self.entries_index[i + distance].base_offset = ei.base_offset;
self.entries_index[i + distance].compression_type = ei.compression_type;
self.entries_index[i + distance].batch_len = ei.batch_len;
self.entries_index[i + distance].offset = ei.offset;
self.entries_index[i + distance].len = ei.len;
}
self.compact_cache_to(self.entries_index[distance + ents_len - 1].index + 1);
self.rewrite_count = distance + ents_len;
}
pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>, file_num: u64) {
if let Some(origin) = self.kvs.insert(key, (value, LogQueue::Append, file_num)) {
if origin.1 == LogQueue::Rewrite {
self.global_stats.add_compacted_rewrite(1);
}
}
}
pub fn put_rewrite(&mut self, key: Vec<u8>, value: Vec<u8>, file_num: u64) {
self.kvs.insert(key, (value, LogQueue::Rewrite, file_num));
self.global_stats.add_rewrite(1);
}
pub fn rewrite_key(&mut self, key: Vec<u8>, latest_rewrite: Option<u64>, file_num: u64) {
self.global_stats.add_rewrite(1);
if let Some(value) = self.kvs.get_mut(&key) {
if value.1 == LogQueue::Append {
if let Some(latest_rewrite) = latest_rewrite {
if value.2 <= latest_rewrite {
value.1 = LogQueue::Rewrite;
value.2 = file_num;
}
} else {
self.global_stats.add_compacted_rewrite(1);
}
} else {
assert!(value.2 <= file_num);
value.2 = file_num;
}
} else {
self.global_stats.add_compacted_rewrite(1);
}
}
pub fn delete(&mut self, key: &[u8]) {
if let Some(value) = self.kvs.remove(key) {
if value.1 == LogQueue::Rewrite {
self.global_stats.add_compacted_rewrite(1);
}
}
}
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
self.kvs.get(key).map(|v| v.0.clone())
}
pub fn compact_to(&mut self, mut idx: u64) -> u64 {
self.compact_cache_to(idx);
let first_idx = match self.entries_index.front() {
Some(e) if e.index < idx => e.index,
_ => return 0,
};
let last_idx = self.entries_index.back().unwrap().index;
idx = cmp::min(last_idx + 1, idx);
let drain_end = (idx - first_idx) as usize;
self.entries_index.drain(..drain_end);
self.shrink_entries_index();
let rewrite_sub = cmp::min(drain_end, self.rewrite_count);
self.rewrite_count -= rewrite_sub;
self.global_stats.add_compacted_rewrite(rewrite_sub);
drain_end as u64
}
pub fn compact_cache_to(&mut self, mut idx: u64) {
let first_idx = match self.entries_cache.front() {
Some(e) if W::index(e) < idx => W::index(e),
_ => return,
};
let last_index = W::index(self.entries_cache.back().unwrap());
assert!(last_index == self.entries_index.back().unwrap().index);
idx = cmp::min(last_index + 1, idx);
let distance = self.cache_distance();
let drain_end = (idx - first_idx) as usize;
self.entries_cache.drain(0..drain_end);
for i in 0..drain_end {
let entry_index = &mut self.entries_index[distance + i];
entry_index.cache_tracker.take();
}
self.shrink_entries_cache();
}
pub fn get_entry(&self, index: u64) -> (Option<E>, Option<EntryIndex>) {
if self.entries_index.is_empty() {
return (None, None);
}
let first_index = self.entries_index.front().unwrap().index;
let last_index = self.entries_index.back().unwrap().index;
if index < first_index || index > last_index {
return (None, None);
}
let ioffset = (index - first_index) as usize;
let cache_distance = self.cache_distance();
if ioffset < cache_distance {
self.global_stats.add_cache_miss(1);
let entry_index = self.entries_index[ioffset].clone();
(None, Some(entry_index))
} else {
self.global_stats.add_cache_hit(1);
let coffset = ioffset - cache_distance;
let entry = self.entries_cache[coffset].clone();
(Some(entry), None)
}
}
pub fn fetch_entries_to(
&self,
begin: u64,
end: u64,
max_size: Option<usize>,
vec: &mut Vec<E>,
vec_idx: &mut Vec<EntryIndex>,
) -> Result<()> {
assert!(end > begin, "fetch_entries_to({}, {})", begin, end);
let (vec_len, vec_idx_len) = (vec.len(), vec_idx.len());
if self.entries_index.is_empty() {
return Err(Error::StorageUnavailable);
}
let first_index = self.entries_index.front().unwrap().index;
if begin < first_index {
return Err(Error::StorageCompacted);
}
let last_index = self.entries_index.back().unwrap().index;
if end > last_index + 1 {
return Err(Error::StorageUnavailable);
}
let start_pos = (begin - first_index) as usize;
let mut end_pos = (end - begin) as usize + start_pos;
if let Some(max_size) = max_size {
let count_limit = self.count_limit(start_pos, end_pos, max_size);
end_pos = start_pos + count_limit;
}
let cache_offset = self.cache_distance();
if cache_offset < end_pos {
if start_pos >= cache_offset {
let low = start_pos - cache_offset;
let high = end_pos - cache_offset;
let (first, second) = slices_in_range(&self.entries_cache, low, high);
vec.extend_from_slice(first);
vec.extend_from_slice(second);
} else {
let high = end_pos - cache_offset;
let (first, second) = slices_in_range(&self.entries_cache, 0, high);
vec.extend_from_slice(first);
vec.extend_from_slice(second);
let (first, second) = slices_in_range(&self.entries_index, start_pos, cache_offset);
vec_idx.extend_from_slice(first);
vec_idx.extend_from_slice(second);
}
} else {
let (first, second) = slices_in_range(&self.entries_index, start_pos, end_pos);
vec_idx.extend_from_slice(first);
vec_idx.extend_from_slice(second);
}
let (hit, miss) = (vec.len() - vec_len, vec_idx.len() - vec_idx_len);
self.global_stats.add_cache_hit(hit);
self.global_stats.add_cache_miss(miss);
Ok(())
}
pub fn fetch_rewrite_entries(
&self,
latest_rewrite: u64,
vec: &mut Vec<E>,
vec_idx: &mut Vec<EntryIndex>,
) -> Result<()> {
let begin = self
.entries_index
.iter()
.find(|e| e.queue == LogQueue::Append);
let end = self
.entries_index
.iter()
.rev()
.find(|e| e.file_num <= latest_rewrite);
if let (Some(begin), Some(end)) = (begin, end) {
if begin.index <= end.index {
return self.fetch_entries_to(begin.index, end.index + 1, None, vec, vec_idx);
}
}
Ok(())
}
pub fn fetch_entries_from_rewrite(
&self,
vec: &mut Vec<E>,
vec_idx: &mut Vec<EntryIndex>,
) -> Result<()> {
if self.rewrite_count > 0 {
let end = self.entries_index[self.rewrite_count - 1].index + 1;
let first = self.entries_index.front().unwrap().index;
return self.fetch_entries_to(first, end, None, vec, vec_idx);
}
Ok(())
}
pub fn fetch_rewrite_kvs(&self, latest_rewrite: u64, vec: &mut Vec<(Vec<u8>, Vec<u8>)>) {
for (key, (value, queue, file_num)) in &self.kvs {
if *queue == LogQueue::Append && *file_num <= latest_rewrite {
vec.push((key.clone(), value.clone()));
}
}
}
pub fn fetch_kvs_from_rewrite(&self, vec: &mut Vec<(Vec<u8>, Vec<u8>)>) {
for (key, (value, queue, _)) in &self.kvs {
if *queue == LogQueue::Rewrite {
vec.push((key.clone(), value.clone()));
}
}
}
pub fn min_file_num(&self, queue: LogQueue) -> Option<u64> {
let entry = match queue {
LogQueue::Append => self.entries_index.get(self.rewrite_count),
LogQueue::Rewrite if self.rewrite_count == 0 => None,
LogQueue::Rewrite => self.entries_index.front(),
};
let ents_min = entry.map(|e| e.file_num);
let kvs_min = self.kvs_min_file_num(queue);
match (ents_min, kvs_min) {
(Some(ents_min), Some(kvs_min)) => Some(cmp::min(ents_min, kvs_min)),
(Some(ents_min), None) => Some(ents_min),
(None, Some(kvs_min)) => Some(kvs_min),
(None, None) => None,
}
}
pub fn entries_count(&self) -> usize {
self.entries_index.len()
}
pub fn region_id(&self) -> u64 {
self.region_id
}
pub fn first_index(&self) -> Option<u64> {
self.entries_index.front().map(|e| e.index)
}
pub fn last_index(&self) -> Option<u64> {
self.entries_index.back().map(|e| e.index)
}
pub fn kvs_min_file_num(&self, queue: LogQueue) -> Option<u64> {
self.kvs
.values()
.filter(|v| v.1 == queue)
.fold(None, |min, v| Some(cmp::min(min.unwrap_or(u64::MAX), v.2)))
}
pub fn needs_rewrite_or_compact(
&self,
latest_rewrite: u64,
latest_compact: u64,
rewrite_count_limit: usize,
) -> (bool, bool) {
debug_assert!(latest_compact <= latest_rewrite);
let min_file_num = match self.min_file_num(LogQueue::Append) {
Some(file_num) if file_num <= latest_rewrite => file_num,
_ => return (false, false),
};
let entries_count = self.entries_count();
if min_file_num < latest_compact && entries_count > rewrite_count_limit {
return (false, true);
}
let needs_rewrite = entries_count < rewrite_count_limit;
(needs_rewrite, false)
}
fn count_limit(&self, start_idx: usize, end_idx: usize, max_size: usize) -> usize {
assert!(start_idx < end_idx);
let (first, second) = slices_in_range(&self.entries_index, start_idx, end_idx);
let (mut count, mut total_size) = (0, 0);
for i in first.iter().chain(second) {
count += 1;
total_size += i.len;
if total_size as usize > max_size {
return cmp::max(count - 1, 1);
}
}
count
}
}
#[cfg(test)]
mod tests {
use super::*;
use raft::eraftpb::Entry;
use std::sync::atomic::AtomicUsize;
impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
pub fn max_file_num(&self, queue: LogQueue) -> Option<u64> {
let entry = match queue {
LogQueue::Append if self.rewrite_count == self.entries_index.len() => None,
LogQueue::Append => self.entries_index.back(),
LogQueue::Rewrite if self.rewrite_count == 0 => None,
LogQueue::Rewrite => self.entries_index.get(self.rewrite_count - 1),
};
let ents_max = entry.map(|e| e.file_num);
let kvs_max = self.kvs_max_file_num(queue);
match (ents_max, kvs_max) {
(Some(ents_max), Some(kvs_max)) => Some(cmp::max(ents_max, kvs_max)),
(Some(ents_max), None) => Some(ents_max),
(None, Some(kvs_max)) => Some(kvs_max),
(None, None) => None,
}
}
pub fn kvs_max_file_num(&self, queue: LogQueue) -> Option<u64> {
self.kvs
.values()
.filter(|v| v.1 == queue)
.fold(None, |max, v| Some(cmp::max(max.unwrap_or(0), v.2)))
}
pub fn fetch_all(&self, vec: &mut Vec<E>, vec_idx: &mut Vec<EntryIndex>) {
if self.entries_index.is_empty() {
return;
}
let begin = self.entries_index.front().unwrap().index;
let end = self.entries_index.back().unwrap().index + 1;
self.fetch_entries_to(begin, end, None, vec, vec_idx)
.unwrap();
}
fn entries_size(&self) -> usize {
self.entries_index.iter().fold(0, |acc, e| acc + e.len) as usize
}
fn check_entries_index_and_cache(&self) {
if self.entries_index.is_empty() {
assert!(self.entries_cache.is_empty());
return;
}
let ei_first = self.entries_index.front().unwrap();
let ei_last = self.entries_index.back().unwrap();
assert_eq!(
ei_last.index - ei_first.index + 1,
self.entries_index.len() as u64
);
if !self.entries_cache.is_empty() {
let distance = self.cache_distance();
let cache_first = self.entries_cache.front().unwrap();
let cache_last = self.entries_cache.back().unwrap();
assert_eq!(W::index(cache_last), ei_last.index);
assert_eq!(self.entries_index[distance].index, W::index(cache_first));
}
}
}
#[test]
fn test_memtable_append() {
let region_id = 8;
let stats = Arc::new(GlobalStats::default());
let mut memtable = MemTable::<Entry, Entry>::new(region_id, stats.clone());
let (ents, ents_idx) = generate_ents(10, 20, LogQueue::Append, 1, &stats);
memtable.append(ents, ents_idx);
assert_eq!(memtable.entries_size(), 10);
assert_eq!(stats.cache_size(), 10);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 1);
memtable.check_entries_index_and_cache();
let (ents, ents_idx) = generate_ents(20, 30, LogQueue::Append, 2, &stats);
memtable.append(ents, ents_idx);
assert_eq!(memtable.entries_size(), 20);
assert_eq!(stats.cache_size(), 20);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 2);
memtable.check_entries_index_and_cache();
let (ents, ents_idx) = generate_ents(25, 35, LogQueue::Append, 3, &stats);
memtable.append(ents, ents_idx);
assert_eq!(memtable.entries_size(), 25);
assert_eq!(stats.cache_size(), 25);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 3);
memtable.check_entries_index_and_cache();
let (ents, ents_idx) = generate_ents(10, 40, LogQueue::Append, 4, &stats);
memtable.append(ents, ents_idx);
assert_eq!(memtable.entries_size(), 30);
assert_eq!(stats.cache_size(), 30);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 4);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4);
memtable.check_entries_index_and_cache();
}
#[test]
fn test_memtable_compact() {
let region_id = 8;
let stats = Arc::new(GlobalStats::default());
let mut memtable = MemTable::<Entry, Entry>::new(region_id, stats.clone());
let (ents, ents_idx) = generate_ents(0, 10, LogQueue::Append, 1, &stats);
memtable.append(ents, ents_idx);
let (ents, ents_idx) = generate_ents(10, 15, LogQueue::Append, 2, &stats);
memtable.append(ents, ents_idx);
let (ents, ents_idx) = generate_ents(15, 20, LogQueue::Append, 2, &stats);
memtable.append(ents, ents_idx);
let (ents, ents_idx) = generate_ents(20, 25, LogQueue::Append, 3, &stats);
memtable.append(ents, ents_idx);
assert_eq!(memtable.entries_size(), 25);
assert_eq!(stats.cache_size(), 25);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 3);
memtable.check_entries_index_and_cache();
assert_eq!(memtable.compact_to(5), 5);
assert_eq!(memtable.entries_size(), 20);
assert_eq!(stats.cache_size(), 20);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 3);
memtable.check_entries_index_and_cache();
assert_eq!(memtable.compact_to(20), 15);
assert_eq!(memtable.entries_size(), 5);
assert_eq!(stats.cache_size(), 5);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 3);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 3);
memtable.check_entries_index_and_cache();
assert_eq!(memtable.compact_to(20), 0);
assert_eq!(memtable.compact_to(15), 0);
assert_eq!(memtable.entries_size(), 5);
assert_eq!(stats.cache_size(), 5);
memtable.check_entries_index_and_cache();
}
#[test]
fn test_memtable_compact_cache() {
let region_id = 8;
let stats = Arc::new(GlobalStats::default());
let mut memtable = MemTable::<Entry, Entry>::new(region_id, stats.clone());
let (ents, ents_idx) = generate_ents(0, 10, LogQueue::Append, 1, &stats);
memtable.append(ents, ents_idx);
let (ents, ents_idx) = generate_ents(10, 15, LogQueue::Append, 2, &stats);
memtable.append(ents, ents_idx);
let (ents, ents_idx) = generate_ents(15, 20, LogQueue::Append, 2, &stats);
memtable.append(ents, ents_idx);
let (ents, ents_idx) = generate_ents(20, 25, LogQueue::Append, 3, &stats);
memtable.append(ents, ents_idx);
assert_eq!(memtable.entries_size(), 25);
assert_eq!(stats.cache_size(), 25);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 3);
memtable.check_entries_index_and_cache();
memtable.compact_cache_to(15);
assert_eq!(stats.cache_size(), 10);
memtable.check_entries_index_and_cache();
memtable.compact_to(20);
assert_eq!(stats.cache_size(), 5);
memtable.check_entries_index_and_cache();
memtable.compact_cache_to(25);
assert_eq!(stats.cache_size(), 0);
memtable.check_entries_index_and_cache();
}
#[test]
fn test_memtable_fetch() {
let region_id = 8;
let stats = Arc::new(GlobalStats::default());
let mut memtable = MemTable::<Entry, Entry>::new(region_id, stats.clone());
let (ents, ents_idx) = generate_ents(0, 10, LogQueue::Append, 1, &stats);
memtable.append(ents, ents_idx);
let (ents, ents_idx) = generate_ents(10, 20, LogQueue::Append, 2, &stats);
memtable.append(ents, ents_idx);
let (ents, ents_idx) = generate_ents(20, 25, LogQueue::Append, 3, &stats);
memtable.append(ents, ents_idx);
memtable.compact_cache_to(15);
let mut ents = vec![];
let mut ents_idx = vec![];
memtable.fetch_all(&mut ents, &mut ents_idx);
assert_eq!(ents.len(), 10);
assert_eq!(ents[0].get_index(), 15);
assert_eq!(ents[9].get_index(), 24);
assert_eq!(ents_idx.len(), 15);
assert_eq!(ents_idx[0].index, 0);
assert_eq!(ents_idx[14].index, 14);
assert_eq!(stats.cache_hit(), 10);
assert_eq!(stats.cache_miss(), 15);
assert_eq!(memtable.compact_to(10), 10);
ents.clear();
ents_idx.clear();
assert!(memtable
.fetch_entries_to(5, 15, None, &mut ents, &mut ents_idx)
.is_err());
ents.clear();
ents_idx.clear();
assert!(memtable
.fetch_entries_to(20, 30, None, &mut ents, &mut ents_idx)
.is_err());
ents.clear();
ents_idx.clear();
stats.reset_cache();
memtable
.fetch_entries_to(20, 25, None, &mut ents, &mut ents_idx)
.unwrap();
assert_eq!(ents.len(), 5);
assert_eq!(ents[0].get_index(), 20);
assert_eq!(ents[4].get_index(), 24);
assert!(ents_idx.is_empty());
assert_eq!(stats.cache_hit(), 5);
ents.clear();
ents_idx.clear();
stats.reset_cache();
memtable
.fetch_entries_to(10, 15, None, &mut ents, &mut ents_idx)
.unwrap();
assert!(ents.is_empty());
assert_eq!(ents_idx.len(), 5);
assert_eq!(ents_idx[0].index, 10);
assert_eq!(ents_idx[4].index, 14);
assert_eq!(stats.cache_miss(), 5);
ents.clear();
ents_idx.clear();
stats.reset_cache();
memtable
.fetch_entries_to(10, 25, None, &mut ents, &mut ents_idx)
.unwrap();
assert_eq!(ents.len(), 10);
assert_eq!(ents[0].get_index(), 15);
assert_eq!(ents[9].get_index(), 24);
assert_eq!(ents_idx.len(), 5);
assert_eq!(ents_idx[0].index, 10);
assert_eq!(ents_idx[4].index, 14);
assert_eq!(stats.cache_hit(), 10);
assert_eq!(stats.cache_miss(), 5);
ents.clear();
ents_idx.clear();
let max_size = Some(10);
stats.reset_cache();
memtable
.fetch_entries_to(10, 25, max_size, &mut ents, &mut ents_idx)
.unwrap();
assert_eq!(ents.len(), 5);
assert_eq!(ents[0].get_index(), 15);
assert_eq!(ents[4].get_index(), 19);
assert_eq!(ents_idx.len(), 5);
assert_eq!(ents_idx[0].index, 10);
assert_eq!(ents_idx[4].index, 14);
assert_eq!(stats.cache_hit(), 5);
assert_eq!(stats.cache_miss(), 5);
ents.clear();
ents_idx.clear();
stats.reset_cache();
memtable
.fetch_entries_to(20, 25, Some(0), &mut ents, &mut ents_idx)
.unwrap();
assert_eq!(ents.len(), 1);
assert_eq!(ents[0].get_index(), 20);
assert!(ents_idx.is_empty());
assert_eq!(stats.cache_hit(), 1);
}
#[test]
fn test_memtable_fetch_rewrite() {
let region_id = 8;
let stats = Arc::new(GlobalStats::default());
let mut memtable = MemTable::<Entry, Entry>::new(region_id, stats.clone());
let (ents, ents_idx) = generate_ents(0, 10, LogQueue::Append, 1, &stats);
memtable.append(ents, ents_idx);
memtable.put(b"k1".to_vec(), b"v1".to_vec(), 1);
let (ents, ents_idx) = generate_ents(10, 20, LogQueue::Append, 2, &stats);
memtable.append(ents, ents_idx);
memtable.put(b"k2".to_vec(), b"v2".to_vec(), 2);
let (ents, ents_idx) = generate_ents(20, 25, LogQueue::Append, 3, &stats);
memtable.append(ents, ents_idx);
memtable.put(b"k3".to_vec(), b"v3".to_vec(), 3);
let (_, ents_idx) = generate_ents(0, 10, LogQueue::Rewrite, 50, &stats);
memtable.rewrite(ents_idx, Some(1));
memtable.rewrite_key(b"k1".to_vec(), Some(1), 50);
assert_eq!(memtable.entries_size(), 25);
assert_eq!(stats.cache_size(), 15);
memtable.compact_cache_to(15);
let (mut ents, mut ents_idx) = (vec![], vec![]);
assert!(memtable
.fetch_rewrite_entries(2, &mut ents, &mut ents_idx)
.is_ok());
assert_eq!(ents.len(), 5);
assert_eq!(ents_idx.len(), 5);
assert_eq!(ents.first().unwrap().index, 15);
assert_eq!(ents_idx.last().unwrap().index, 14);
ents.clear();
ents_idx.clear();
assert!(memtable
.fetch_entries_from_rewrite(&mut ents, &mut ents_idx)
.is_ok());
assert_eq!(ents_idx.len(), 10);
assert_eq!(ents_idx.first().unwrap().index, 0);
assert_eq!(ents_idx.last().unwrap().index, 9);
}
#[test]
fn test_memtable_kv_operations() {
let region_id = 8;
let stats = Arc::new(GlobalStats::default());
let mut memtable = MemTable::<Entry, Entry>::new(region_id, stats);
let (k1, v1) = (b"key1", b"value1");
let (k5, v5) = (b"key5", b"value5");
memtable.put(k1.to_vec(), v1.to_vec(), 1);
memtable.put(k5.to_vec(), v5.to_vec(), 5);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 1);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 5);
assert_eq!(memtable.get(k1.as_ref()), Some(v1.to_vec()));
assert_eq!(memtable.get(k5.as_ref()), Some(v5.to_vec()));
memtable.delete(k5.as_ref());
assert_eq!(memtable.get(k5.as_ref()), None);
}
#[test]
fn test_memtable_get_entry() {
let region_id = 8;
let stats = Arc::new(GlobalStats::default());
let mut memtable = MemTable::<Entry, Entry>::new(region_id, stats.clone());
let (ents, ents_idx) = generate_ents(5, 10, LogQueue::Append, 1, &stats);
memtable.append(ents, ents_idx);
let (ents, ents_idx) = generate_ents(10, 20, LogQueue::Append, 2, &stats);
memtable.append(ents, ents_idx);
memtable.compact_cache_to(10);
assert_eq!(memtable.get_entry(2), (None, None));
assert_eq!(memtable.get_entry(25), (None, None));
let (entry, _) = memtable.get_entry(10);
assert_eq!(entry.unwrap().get_index(), 10);
let (_, entry_idx) = memtable.get_entry(5);
assert_eq!(entry_idx.unwrap().index, 5);
}
#[test]
fn test_memtable_rewrite() {
let region_id = 8;
let stats = Arc::new(GlobalStats::default());
let mut memtable = MemTable::<Entry, Entry>::new(region_id, stats.clone());
let (ents, ents_idx) = generate_ents(10, 20, LogQueue::Append, 2, &stats);
memtable.append(ents, ents_idx);
memtable.put(b"kk1".to_vec(), b"vv1".to_vec(), 2);
let (ents, ents_idx) = generate_ents(20, 30, LogQueue::Append, 3, &stats);
memtable.append(ents, ents_idx);
memtable.put(b"kk2".to_vec(), b"vv2".to_vec(), 3);
let (ents, ents_idx) = generate_ents(30, 40, LogQueue::Append, 4, &stats);
memtable.append(ents, ents_idx);
memtable.put(b"kk3".to_vec(), b"vv3".to_vec(), 4);
memtable.compact_cache_to(25);
assert_eq!(memtable.entries_size(), 30);
assert_eq!(stats.cache_size(), 15);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 2);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4);
memtable.check_entries_index_and_cache();
let (_, ents_idx) = generate_ents(0, 10, LogQueue::Rewrite, 50, &stats);
memtable.rewrite(ents_idx, Some(1));
memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 2);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4);
assert!(memtable.min_file_num(LogQueue::Rewrite).is_none());
assert!(memtable.max_file_num(LogQueue::Rewrite).is_none());
assert_eq!(memtable.rewrite_count, 0);
assert_eq!(memtable.get(b"kk0"), None);
assert_eq!(memtable.global_stats.rewrite_operations(), 11);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 11);
let (_, ents_idx) = generate_ents(0, 20, LogQueue::Rewrite, 100, &stats);
memtable.rewrite(ents_idx, Some(2));
assert_eq!(memtable.global_stats.rewrite_operations(), 31);
memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50);
memtable.rewrite_key(b"kk1".to_vec(), Some(2), 100);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 3);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4);
assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100);
assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 100);
assert_eq!(memtable.rewrite_count, 10);
assert_eq!(memtable.get(b"kk1"), Some(b"vv1".to_vec()));
assert_eq!(memtable.global_stats.rewrite_operations(), 33);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 22);
let (_, ents_idx) = generate_ents(20, 30, LogQueue::Rewrite, 101, &stats);
memtable.rewrite(ents_idx, Some(3));
memtable.rewrite_key(b"kk2".to_vec(), Some(3), 101);
assert_eq!(stats.cache_size(), 10);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 4);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4);
assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100);
assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 101);
assert_eq!(memtable.rewrite_count, 20);
assert_eq!(memtable.get(b"kk2"), Some(b"vv2".to_vec()));
assert_eq!(memtable.global_stats.rewrite_operations(), 44);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 22);
let (ents, ents_idx) = generate_ents(35, 36, LogQueue::Append, 5, &stats);
memtable.append(ents, ents_idx);
memtable.put(b"kk3".to_vec(), b"vv33".to_vec(), 5);
assert_eq!(stats.cache_size(), 6);
assert_eq!(memtable.entries_index.back().unwrap().index, 35);
let (_, ents_idx) = generate_ents(30, 40, LogQueue::Rewrite, 102, &stats);
memtable.rewrite(ents_idx, Some(4));
memtable.rewrite_key(b"kk3".to_vec(), Some(4), 102);
assert_eq!(stats.cache_size(), 1);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 5);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 5);
assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100);
assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 102);
assert_eq!(memtable.rewrite_count, 25);
assert_eq!(memtable.get(b"kk3"), Some(b"vv33".to_vec()));
assert_eq!(memtable.global_stats.rewrite_operations(), 55);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 27);
let (ents, ents_idx) = generate_ents(35, 50, LogQueue::Append, 6, &stats);
memtable.append(ents, ents_idx);
memtable.compact_to(30);
assert_eq!(memtable.entries_index.back().unwrap().index, 49);
assert_eq!(memtable.rewrite_count, 5);
assert_eq!(memtable.global_stats.rewrite_operations(), 55);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 47);
assert_eq!(stats.cache_size(), 15);
memtable.compact_to(40);
assert_eq!(memtable.rewrite_count, 0);
assert_eq!(memtable.global_stats.rewrite_operations(), 55);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 52);
assert_eq!(stats.cache_size(), 10);
let (_, ents_idx) = generate_ents(30, 36, LogQueue::Rewrite, 103, &stats);
memtable.rewrite(ents_idx, Some(5));
memtable.rewrite_key(b"kk3".to_vec(), Some(5), 103);
assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 6);
assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 6);
assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100);
assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 103);
assert_eq!(memtable.rewrite_count, 0);
assert_eq!(memtable.global_stats.rewrite_operations(), 62);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 58);
assert_eq!(stats.cache_size(), 10);
let (ents, ents_idx) = generate_ents(50, 55, LogQueue::Append, 7, &stats);
memtable.append(ents, ents_idx);
let (_, ents_idx) = generate_ents(30, 50, LogQueue::Rewrite, 104, &stats);
memtable.rewrite(ents_idx, Some(6));
assert_eq!(memtable.rewrite_count, 10);
assert_eq!(memtable.global_stats.rewrite_operations(), 82);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 68);
assert_eq!(stats.cache_size(), 5);
let (ents, ents_idx) = generate_ents(45, 50, LogQueue::Append, 7, &stats);
memtable.append(ents, ents_idx);
assert_eq!(memtable.rewrite_count, 5);
assert_eq!(memtable.global_stats.rewrite_operations(), 82);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 73);
assert_eq!(stats.cache_size(), 5);
let (ents, ents_idx) = generate_ents(40, 50, LogQueue::Append, 7, &stats);
memtable.append(ents, ents_idx);
assert_eq!(memtable.rewrite_count, 0);
assert_eq!(memtable.global_stats.rewrite_operations(), 82);
assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 78);
assert_eq!(stats.cache_size(), 10);
}
fn generate_ents(
begin_idx: u64,
end_idx: u64,
queue: LogQueue,
file_num: u64,
stats: &Arc<GlobalStats>,
) -> (Vec<Entry>, Vec<EntryIndex>) {
assert!(end_idx >= begin_idx);
let (mut ents, mut ents_idx) = (vec![], vec![]);
for idx in begin_idx..end_idx {
let mut ent = Entry::new();
ent.set_index(idx);
ents.push(ent);
let mut ent_idx = EntryIndex::default();
ent_idx.index = idx;
ent_idx.queue = queue;
ent_idx.file_num = file_num;
ent_idx.offset = idx;
ent_idx.len = 1;
if queue != LogQueue::Rewrite {
let mut tracker = CacheTracker::new(stats.clone(), Arc::new(AtomicUsize::new(1)));
tracker.global_stats.add_mem_change(1);
tracker.sub_on_drop = 1;
ent_idx.cache_tracker = Some(tracker);
}
ents_idx.push(ent_idx);
}
(ents, ents_idx)
}
}