use codec::prelude::NumberDecoder;
use std::mem::size_of;
use tikv_util::codec::number::{self, NumberEncoder, MAX_VAR_U64_LEN};
use crate::lock::LockType;
use crate::timestamp::TimeStamp;
use crate::types::{Value, SHORT_VALUE_PREFIX};
use crate::{Error, ErrorInner, Result};
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WriteType {
Put,
Delete,
Lock,
Rollback,
}
const FLAG_PUT: u8 = b'P';
const FLAG_DELETE: u8 = b'D';
const FLAG_LOCK: u8 = b'L';
const FLAG_ROLLBACK: u8 = b'R';
const FLAG_OVERLAPPED_ROLLBACK: u8 = b'R';
const GC_FENCE_PREFIX: u8 = b'F';
const PROTECTED_ROLLBACK_SHORT_VALUE: &[u8] = b"p";
impl WriteType {
pub fn from_lock_type(tp: LockType) -> Option<WriteType> {
match tp {
LockType::Put => Some(WriteType::Put),
LockType::Delete => Some(WriteType::Delete),
LockType::Lock => Some(WriteType::Lock),
LockType::Pessimistic => None,
}
}
pub fn from_u8(b: u8) -> Option<WriteType> {
match b {
FLAG_PUT => Some(WriteType::Put),
FLAG_DELETE => Some(WriteType::Delete),
FLAG_LOCK => Some(WriteType::Lock),
FLAG_ROLLBACK => Some(WriteType::Rollback),
_ => None,
}
}
fn to_u8(self) -> u8 {
match self {
WriteType::Put => FLAG_PUT,
WriteType::Delete => FLAG_DELETE,
WriteType::Lock => FLAG_LOCK,
WriteType::Rollback => FLAG_ROLLBACK,
}
}
}
#[derive(PartialEq, Clone)]
pub struct Write {
pub write_type: WriteType,
pub start_ts: TimeStamp,
pub short_value: Option<Value>,
pub has_overlapped_rollback: bool,
pub gc_fence: Option<TimeStamp>,
}
impl std::fmt::Debug for Write {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Write")
.field("write_type", &self.write_type)
.field("start_ts", &self.start_ts)
.field(
"short_value",
&self
.short_value
.as_ref()
.map(|x| &x[..])
.map(log_wrappers::Value::value)
.map(|x| format!("{:?}", x))
.unwrap_or_else(|| "None".to_owned()),
)
.field("has_overlapped_rollback", &self.has_overlapped_rollback)
.field("gc_fence", &self.gc_fence)
.finish()
}
}
impl Write {
#[inline]
pub fn new(write_type: WriteType, start_ts: TimeStamp, short_value: Option<Value>) -> Write {
Write {
write_type,
start_ts,
short_value,
has_overlapped_rollback: false,
gc_fence: None,
}
}
#[inline]
pub fn new_rollback(start_ts: TimeStamp, protected: bool) -> Write {
let short_value = if protected {
Some(PROTECTED_ROLLBACK_SHORT_VALUE.to_vec())
} else {
None
};
Write {
write_type: WriteType::Rollback,
start_ts,
short_value,
has_overlapped_rollback: false,
gc_fence: None,
}
}
#[inline]
pub fn set_overlapped_rollback(
mut self,
has_overlapped_rollback: bool,
gc_fence: Option<TimeStamp>,
) -> Self {
self.has_overlapped_rollback = has_overlapped_rollback;
self.gc_fence = gc_fence;
self
}
#[inline]
pub fn parse_type(mut b: &[u8]) -> Result<WriteType> {
let write_type_bytes = b
.read_u8()
.map_err(|_| Error::from(ErrorInner::BadFormatWrite))?;
WriteType::from_u8(write_type_bytes).ok_or_else(|| Error::from(ErrorInner::BadFormatWrite))
}
#[inline]
pub fn as_ref(&self) -> WriteRef<'_> {
WriteRef {
write_type: self.write_type,
start_ts: self.start_ts,
short_value: self.short_value.as_deref(),
has_overlapped_rollback: self.has_overlapped_rollback,
gc_fence: self.gc_fence,
}
}
}
#[derive(PartialEq, Clone)]
pub struct WriteRef<'a> {
pub write_type: WriteType,
pub start_ts: TimeStamp,
pub short_value: Option<&'a [u8]>,
pub has_overlapped_rollback: bool,
pub gc_fence: Option<TimeStamp>,
}
impl WriteRef<'_> {
pub fn parse(mut b: &[u8]) -> Result<WriteRef<'_>> {
let write_type_bytes = b
.read_u8()
.map_err(|_| Error::from(ErrorInner::BadFormatWrite))?;
let write_type = WriteType::from_u8(write_type_bytes)
.ok_or_else(|| Error::from(ErrorInner::BadFormatWrite))?;
let start_ts = b
.read_var_u64()
.map_err(|_| Error::from(ErrorInner::BadFormatWrite))?
.into();
let mut short_value = None;
let mut has_overlapped_rollback = false;
let mut gc_fence = None;
while !b.is_empty() {
match b
.read_u8()
.map_err(|_| Error::from(ErrorInner::BadFormatWrite))?
{
SHORT_VALUE_PREFIX => {
let len = b
.read_u8()
.map_err(|_| Error::from(ErrorInner::BadFormatWrite))?;
if b.len() < len as usize {
panic!(
"content len [{}] shorter than short value len [{}]",
b.len(),
len,
);
}
short_value = Some(&b[..len as usize]);
b = &b[len as usize..];
}
FLAG_OVERLAPPED_ROLLBACK => {
has_overlapped_rollback = true;
}
GC_FENCE_PREFIX => gc_fence = Some(number::decode_u64(&mut b)?.into()),
_ => {
break;
}
}
}
Ok(WriteRef {
write_type,
start_ts,
short_value,
has_overlapped_rollback,
gc_fence,
})
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut b = Vec::with_capacity(self.pre_allocate_size());
b.push(self.write_type.to_u8());
b.encode_var_u64(self.start_ts.into_inner()).unwrap();
if let Some(v) = self.short_value {
b.push(SHORT_VALUE_PREFIX);
b.push(v.len() as u8);
b.extend_from_slice(v);
}
if self.has_overlapped_rollback {
b.push(FLAG_OVERLAPPED_ROLLBACK);
}
if let Some(ts) = self.gc_fence {
b.push(GC_FENCE_PREFIX);
b.encode_u64(ts.into_inner()).unwrap();
}
b
}
fn pre_allocate_size(&self) -> usize {
let mut size = 1 + MAX_VAR_U64_LEN + self.has_overlapped_rollback as usize;
if let Some(v) = &self.short_value {
size += 2 + v.len();
}
if self.gc_fence.is_some() {
size += 1 + size_of::<u64>();
}
size
}
pub fn check_gc_fence_as_latest_version(&self, read_ts: TimeStamp) -> bool {
if let Some(gc_fence_ts) = self.gc_fence {
if !gc_fence_ts.is_zero() && gc_fence_ts <= read_ts {
return false;
}
}
true
}
#[inline]
pub fn is_protected(&self) -> bool {
self.write_type == WriteType::Rollback
&& self
.short_value
.as_ref()
.map(|v| *v == PROTECTED_ROLLBACK_SHORT_VALUE)
.unwrap_or_default()
}
#[inline]
pub fn to_owned(&self) -> Write {
Write::new(
self.write_type,
self.start_ts,
self.short_value.map(|v| v.to_owned()),
)
.set_overlapped_rollback(self.has_overlapped_rollback, self.gc_fence)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_write_type() {
let mut tests = vec![
(Some(LockType::Put), WriteType::Put, FLAG_PUT),
(Some(LockType::Delete), WriteType::Delete, FLAG_DELETE),
(Some(LockType::Lock), WriteType::Lock, FLAG_LOCK),
(None, WriteType::Rollback, FLAG_ROLLBACK),
];
for (i, (lock_type, write_type, flag)) in tests.drain(..).enumerate() {
if let Some(lock_type) = lock_type {
let wt = WriteType::from_lock_type(lock_type).unwrap();
assert_eq!(
wt, write_type,
"#{}, expect from_lock_type({:?}) returns {:?}, but got {:?}",
i, lock_type, write_type, wt
);
}
let f = write_type.to_u8();
assert_eq!(
f, flag,
"#{}, expect {:?}.to_u8() returns {:?}, but got {:?}",
i, write_type, flag, f
);
let wt = WriteType::from_u8(flag).unwrap();
assert_eq!(
wt, write_type,
"#{}, expect from_u8({:?}) returns {:?}, but got {:?}",
i, flag, write_type, wt
);
}
}
#[test]
fn test_write() {
let mut writes = vec![
Write::new(WriteType::Put, 0.into(), Some(b"short_value".to_vec())),
Write::new(WriteType::Delete, (1 << 20).into(), None),
Write::new_rollback((1 << 40).into(), true),
Write::new(WriteType::Rollback, (1 << 41).into(), None),
Write::new(WriteType::Put, 123.into(), None).set_overlapped_rollback(true, None),
Write::new(WriteType::Put, 123.into(), None)
.set_overlapped_rollback(true, Some(1234567.into())),
Write::new(WriteType::Put, 456.into(), Some(b"short_value".to_vec()))
.set_overlapped_rollback(true, None),
Write::new(WriteType::Put, 456.into(), Some(b"short_value".to_vec()))
.set_overlapped_rollback(true, Some(0.into())),
Write::new(WriteType::Put, 456.into(), Some(b"short_value".to_vec()))
.set_overlapped_rollback(true, Some(2345678.into())),
Write::new(WriteType::Put, 456.into(), Some(b"short_value".to_vec()))
.set_overlapped_rollback(true, Some(421397468076048385.into())),
];
for (i, write) in writes.drain(..).enumerate() {
let v = write.as_ref().to_bytes();
assert!(v.len() <= write.as_ref().pre_allocate_size());
let w = WriteRef::parse(&v[..])
.unwrap_or_else(|e| panic!("#{} parse() err: {:?}", i, e))
.to_owned();
assert_eq!(w, write, "#{} expect {:?}, but got {:?}", i, write, w);
assert_eq!(Write::parse_type(&v).unwrap(), w.write_type);
}
assert!(WriteRef::parse(b"").is_err());
let lock = Write::new(WriteType::Lock, 1.into(), Some(b"short_value".to_vec()));
let mut v = lock.as_ref().to_bytes();
assert!(WriteRef::parse(&v[..1]).is_err());
assert_eq!(Write::parse_type(&v).unwrap(), lock.write_type);
v.extend(b"unknown");
let w = WriteRef::parse(&v).unwrap().to_owned();
assert_eq!(w, lock);
}
#[test]
fn test_is_protected() {
assert!(Write::new_rollback(1.into(), true).as_ref().is_protected());
assert!(!Write::new_rollback(2.into(), false).as_ref().is_protected());
assert!(
!Write::new(
WriteType::Put,
3.into(),
Some(PROTECTED_ROLLBACK_SHORT_VALUE.to_vec()),
)
.as_ref()
.is_protected()
);
}
#[test]
fn test_check_gc_fence() {
let cases: Vec<(Option<u64>, u64, bool)> = vec![
(None, 10, true),
(None, 100, true),
(None, u64::max_value(), true),
(Some(0), 100, true),
(Some(0), u64::max_value(), true),
(Some(100), 50, true),
(Some(100), 100, false),
(Some(100), 150, false),
(Some(100), u64::max_value(), false),
];
for case in cases {
let write = Write::new(WriteType::Put, 5.into(), None)
.set_overlapped_rollback(true, case.0.map(Into::into));
assert_eq!(
write
.as_ref()
.check_gc_fence_as_latest_version(case.1.into()),
case.2
);
}
}
}