use super::timestamp::TimeStamp;
use bitflags::bitflags;
use byteorder::{ByteOrder, NativeEndian};
use collections::HashMap;
use kvproto::kvrpcpb;
use std::fmt::{self, Debug, Display, Formatter};
use tikv_util::codec;
use tikv_util::codec::bytes;
use tikv_util::codec::bytes::BytesEncoder;
use tikv_util::codec::number::{self, NumberEncoder};
pub const SHORT_VALUE_MAX_LEN: usize = 255;
pub const SHORT_VALUE_PREFIX: u8 = b'v';
pub fn is_short_value(value: &[u8]) -> bool {
value.len() <= SHORT_VALUE_MAX_LEN
}
pub type Value = Vec<u8>;
pub type KvPair = (Vec<u8>, Value);
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Key(Vec<u8>);
impl Key {
#[inline]
pub fn from_raw(key: &[u8]) -> Key {
let len = codec::bytes::max_encoded_bytes_size(key.len()) + codec::number::U64_SIZE;
let mut encoded = Vec::with_capacity(len);
encoded.encode_bytes(key, false).unwrap();
Key(encoded)
}
#[inline]
pub fn from_raw_maybe_unbounded(key: &[u8]) -> Option<Key> {
if key.is_empty() {
None
} else {
Some(Key::from_raw(key))
}
}
#[inline]
pub fn into_raw(self) -> Result<Vec<u8>, codec::Error> {
let mut k = self.0;
bytes::decode_bytes_in_place(&mut k, false)?;
Ok(k)
}
#[inline]
pub fn to_raw(&self) -> Result<Vec<u8>, codec::Error> {
bytes::decode_bytes(&mut self.0.as_slice(), false)
}
#[inline]
pub fn from_encoded(encoded_key: Vec<u8>) -> Key {
Key(encoded_key)
}
#[inline]
pub fn from_encoded_slice(encoded_key: &[u8]) -> Key {
let mut k = Vec::with_capacity(encoded_key.len() + number::U64_SIZE);
k.extend_from_slice(encoded_key);
Key(k)
}
#[inline]
pub fn as_encoded(&self) -> &Vec<u8> {
&self.0
}
#[inline]
pub fn into_encoded(self) -> Vec<u8> {
self.0
}
#[inline]
pub fn append_ts(mut self, ts: TimeStamp) -> Key {
self.0.encode_u64_desc(ts.into_inner()).unwrap();
self
}
#[inline]
pub fn decode_ts(&self) -> Result<TimeStamp, codec::Error> {
Self::decode_ts_from(&self.0)
}
#[inline]
pub fn truncate_ts(mut self) -> Result<Key, codec::Error> {
let len = self.0.len();
if len < number::U64_SIZE {
Err(codec::Error::KeyLength)
} else {
self.0.truncate(len - number::U64_SIZE);
Ok(self)
}
}
#[inline]
pub fn split_on_ts_for(key: &[u8]) -> Result<(&[u8], TimeStamp), codec::Error> {
if key.len() < number::U64_SIZE {
Err(codec::Error::KeyLength)
} else {
let pos = key.len() - number::U64_SIZE;
let k = &key[..pos];
let mut ts = &key[pos..];
Ok((k, number::decode_u64_desc(&mut ts)?.into()))
}
}
#[inline]
pub fn truncate_ts_for(key: &[u8]) -> Result<&[u8], codec::Error> {
let len = key.len();
if len < number::U64_SIZE {
return Err(codec::Error::KeyLength);
}
Ok(&key[..key.len() - number::U64_SIZE])
}
#[inline]
pub fn decode_ts_from(key: &[u8]) -> Result<TimeStamp, codec::Error> {
let len = key.len();
if len < number::U64_SIZE {
return Err(codec::Error::KeyLength);
}
let mut ts = &key[len - number::U64_SIZE..];
Ok(number::decode_u64_desc(&mut ts)?.into())
}
#[inline]
pub fn is_user_key_eq(ts_encoded_key: &[u8], user_key: &[u8]) -> bool {
let user_key_len = user_key.len();
if ts_encoded_key.len() != user_key_len + number::U64_SIZE {
return false;
}
if user_key_len >= number::U64_SIZE {
let left = NativeEndian::read_u64(&ts_encoded_key[user_key_len - 8..]);
let right = NativeEndian::read_u64(&user_key[user_key_len - 8..]);
if left != right {
return false;
}
ts_encoded_key[..user_key_len - 8] == user_key[..user_key_len - 8]
} else {
ts_encoded_key[..user_key_len] == user_key[..]
}
}
pub fn is_encoded_from(&self, raw_key: &[u8]) -> bool {
bytes::is_encoded_from(&self.0, raw_key, false)
}
pub fn gen_hash(&self) -> u64 {
farmhash::fingerprint64(&self.to_raw().unwrap())
}
}
impl Clone for Key {
fn clone(&self) -> Self {
let mut key = Vec::with_capacity(self.0.capacity());
key.extend_from_slice(&self.0);
Key(key)
}
}
impl Debug for Key {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", &log_wrappers::Value::key(&self.0))
}
}
impl Display for Key {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", &log_wrappers::Value::key(&self.0))
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum MutationType {
Put,
Delete,
Lock,
Insert,
Other,
}
#[derive(Debug, Clone)]
pub enum Mutation {
Put((Key, Value)),
Delete(Key),
Lock(Key),
Insert((Key, Value)),
CheckNotExists(Key),
}
impl Mutation {
pub fn key(&self) -> &Key {
match self {
Mutation::Put((ref key, _)) => key,
Mutation::Delete(ref key) => key,
Mutation::Lock(ref key) => key,
Mutation::Insert((ref key, _)) => key,
Mutation::CheckNotExists(ref key) => key,
}
}
pub fn mutation_type(&self) -> MutationType {
match self {
Mutation::Put(_) => MutationType::Put,
Mutation::Delete(_) => MutationType::Delete,
Mutation::Lock(_) => MutationType::Lock,
Mutation::Insert(_) => MutationType::Insert,
_ => MutationType::Other,
}
}
pub fn into_key_value(self) -> (Key, Option<Value>) {
match self {
Mutation::Put((key, value)) => (key, Some(value)),
Mutation::Delete(key) => (key, None),
Mutation::Lock(key) => (key, None),
Mutation::Insert((key, value)) => (key, Some(value)),
Mutation::CheckNotExists(key) => (key, None),
}
}
pub fn should_not_exists(&self) -> bool {
matches!(self, Mutation::Insert(_) | Mutation::CheckNotExists(_))
}
pub fn should_not_write(&self) -> bool {
matches!(self, Mutation::CheckNotExists(_))
}
}
impl From<kvrpcpb::Mutation> for Mutation {
fn from(mut m: kvrpcpb::Mutation) -> Mutation {
match m.get_op() {
kvrpcpb::Op::Put => Mutation::Put((Key::from_raw(m.get_key()), m.take_value())),
kvrpcpb::Op::Del => Mutation::Delete(Key::from_raw(m.get_key())),
kvrpcpb::Op::Lock => Mutation::Lock(Key::from_raw(m.get_key())),
kvrpcpb::Op::Insert => Mutation::Insert((Key::from_raw(m.get_key()), m.take_value())),
kvrpcpb::Op::CheckNotExists => Mutation::CheckNotExists(Key::from_raw(m.get_key())),
_ => panic!("mismatch Op in prewrite mutations"),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum OldValue {
Value { value: Value },
ValueTimeStamp { start_ts: TimeStamp },
None,
Unspecified,
}
impl Default for OldValue {
fn default() -> Self {
OldValue::Unspecified
}
}
impl OldValue {
pub fn valid(&self) -> bool {
!matches!(self, OldValue::Unspecified)
}
pub fn size(&self) -> usize {
let value_size = match self {
OldValue::Value { value } => value.len(),
_ => 0,
};
value_size + std::mem::size_of::<OldValue>()
}
}
pub type OldValues = HashMap<Key, (OldValue, Option<MutationType>)>;
#[derive(Default, Debug, Clone)]
pub struct TxnExtra {
pub old_values: OldValues,
pub one_pc: bool,
}
impl TxnExtra {
pub fn is_empty(&self) -> bool {
self.old_values.is_empty()
}
}
pub trait TxnExtraScheduler: Send + Sync {
fn schedule(&self, txn_extra: TxnExtra);
}
bitflags! {
pub struct WriteBatchFlags: u64 {
const ONE_PC = 0b00000001;
const STALE_READ = 0b00000010;
}
}
impl WriteBatchFlags {
pub fn from_bits_check(bits: u64) -> WriteBatchFlags {
match WriteBatchFlags::from_bits(bits) {
None => panic!("unrecognized flags: {:b}", bits),
Some(f) => f,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_flags() {
assert!(WriteBatchFlags::from_bits_check(0).is_empty());
assert_eq!(
WriteBatchFlags::from_bits_check(WriteBatchFlags::ONE_PC.bits()),
WriteBatchFlags::ONE_PC
);
assert_eq!(
WriteBatchFlags::from_bits_check(WriteBatchFlags::STALE_READ.bits()),
WriteBatchFlags::STALE_READ
);
}
#[test]
fn test_flags_panic() {
for _ in 0..100 {
assert!(
panic_hook::recover_safe(|| {
let r = rand::random::<u64>() & !WriteBatchFlags::all().bits();
WriteBatchFlags::from_bits_check(r);
if r == 0 {
panic!("panic for zero");
}
})
.is_err()
);
}
}
#[test]
fn test_is_user_key_eq() {
fn eq(a: &[u8], b: &[u8]) -> bool {
Key::is_user_key_eq(a, b)
}
assert_eq!(false, eq(b"", b""));
assert_eq!(false, eq(b"12345", b""));
assert_eq!(true, eq(b"12345678", b""));
assert_eq!(true, eq(b"x12345678", b"x"));
assert_eq!(false, eq(b"x12345", b"x"));
assert_eq!(true, eq(b"xyz12345678", b"xyz"));
assert_eq!(true, eq(b"xyz________", b"xyz"));
assert_eq!(false, eq(b"xyy12345678", b"xyz"));
assert_eq!(false, eq(b"yyz12345678", b"xyz"));
assert_eq!(false, eq(b"xyz12345678", b"xy"));
assert_eq!(false, eq(b"xyy12345678", b"xy"));
assert_eq!(false, eq(b"yyz12345678", b"xy"));
assert_eq!(true, eq(b"abcdefg12345678", b"abcdefg"));
assert_eq!(true, eq(b"abcdefgzzzzzzzz", b"abcdefg"));
assert_eq!(false, eq(b"abcdefg12345678", b"abcdef"));
assert_eq!(false, eq(b"abcdefg12345678", b"bcdefg"));
assert_eq!(false, eq(b"abcdefv12345678", b"abcdefg"));
assert_eq!(false, eq(b"vbcdefg12345678", b"abcdefg"));
assert_eq!(false, eq(b"abccefg12345678", b"abcdefg"));
assert_eq!(true, eq(b"abcdefgh12345678", b"abcdefgh"));
assert_eq!(true, eq(b"abcdefghyyyyyyyy", b"abcdefgh"));
assert_eq!(false, eq(b"abcdefgh12345678", b"abcdefg"));
assert_eq!(false, eq(b"abcdefgh12345678", b"bcdefgh"));
assert_eq!(false, eq(b"abcdefgz12345678", b"abcdefgh"));
assert_eq!(false, eq(b"zbcdefgh12345678", b"abcdefgh"));
assert_eq!(false, eq(b"abcddfgh12345678", b"abcdefgh"));
assert_eq!(true, eq(b"abcdefghi12345678", b"abcdefghi"));
assert_eq!(true, eq(b"abcdefghixxxxxxxx", b"abcdefghi"));
assert_eq!(false, eq(b"abcdefghi12345678", b"abcdefgh"));
assert_eq!(false, eq(b"abcdefghi12345678", b"bcdefghi"));
assert_eq!(false, eq(b"abcdefghy12345678", b"abcdefghi"));
assert_eq!(false, eq(b"ybcdefghi12345678", b"abcdefghi"));
assert_eq!(false, eq(b"abcddfghi12345678", b"abcdefghi"));
assert_eq!(true, eq(b"abcdefghijk87654321", b"abcdefghijk"));
assert_eq!(true, eq(b"abcdefghijkabcdefgh", b"abcdefghijk"));
assert_eq!(false, eq(b"abcdefghijk87654321", b"abcdefghij"));
assert_eq!(false, eq(b"abcdefghijk87654321", b"bcdefghijk"));
assert_eq!(false, eq(b"abcdefghijx87654321", b"abcdefghijk"));
assert_eq!(false, eq(b"xbcdefghijk87654321", b"abcdefghijk"));
assert_eq!(false, eq(b"abxdefghijk87654321", b"abcdefghijk"));
assert_eq!(false, eq(b"axcdefghijk87654321", b"abcdefghijk"));
assert_eq!(false, eq(b"abcdeffhijk87654321", b"abcdefghijk"));
}
#[test]
fn test_is_encoded_from() {
for raw_len in 0..=24 {
let raw: Vec<u8> = (0..raw_len).collect();
let encoded = Key::from_raw(&raw);
assert!(encoded.is_encoded_from(&raw));
let encoded_len = encoded.as_encoded().len();
for i in 0..raw.len() {
let mut invalid_raw = raw.clone();
invalid_raw[i] = raw[i].wrapping_add(1);
assert!(!encoded.is_encoded_from(&invalid_raw));
}
for i in 0..encoded_len {
let mut invalid_encoded = encoded.clone();
invalid_encoded.0[i] = encoded.0[i].wrapping_add(1);
assert!(!invalid_encoded.is_encoded_from(&raw));
}
let mut invalid_encoded = encoded.clone();
invalid_encoded.0.pop();
assert!(!invalid_encoded.is_encoded_from(&raw));
let shorter_encoded = Key::from_encoded_slice(&encoded.0[..encoded_len - 9]);
assert!(!shorter_encoded.is_encoded_from(&raw));
let mut longer_encoded = encoded.as_encoded().clone();
longer_encoded.extend(&[0, 0, 0, 0, 0, 0, 0, 0, 0xFF]);
let longer_encoded = Key::from_encoded(longer_encoded);
assert!(!longer_encoded.is_encoded_from(&raw));
if !raw.is_empty() {
let shorter_raw = &raw[..raw.len() - 1];
assert!(!encoded.is_encoded_from(shorter_raw));
}
let mut longer_raw = raw.to_vec();
longer_raw.push(0);
assert!(!encoded.is_encoded_from(&longer_raw));
}
}
#[test]
fn test_old_value_valid() {
let cases = vec![
(OldValue::Unspecified, false),
(OldValue::None, true),
(OldValue::Value { value: vec![] }, true),
(OldValue::ValueTimeStamp { start_ts: 0.into() }, true),
];
for (old_value, v) in cases {
assert_eq!(old_value.valid(), v);
}
}
}