1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use crate::storage::mvcc::{ErrorInner, Result as MvccResult, SnapshotReader};
use crate::storage::Snapshot;
use txn_types::{Key, TimeStamp, Write, WriteType};
pub(crate) fn check_data_constraint<S: Snapshot>(
reader: &mut SnapshotReader<S>,
should_not_exist: bool,
write: &Write,
write_commit_ts: TimeStamp,
key: &Key,
) -> MvccResult<()> {
let write_is_invalid = matches!(write.gc_fence, Some(gc_fence_ts) if !gc_fence_ts.is_zero());
if !should_not_exist || write.write_type == WriteType::Delete || write_is_invalid {
return Ok(());
}
if write.write_type == WriteType::Put || reader.key_exist(&key, write_commit_ts.prev())? {
return Err(ErrorInner::AlreadyExist { key: key.to_raw()? }.into());
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::mvcc::tests::write;
use crate::storage::mvcc::MvccTxn;
use crate::storage::{Engine, TestEngineBuilder};
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
#[test]
fn test_check_data_constraint() {
let engine = TestEngineBuilder::new().build().unwrap();
let cm = ConcurrencyManager::new(42.into());
let mut txn = MvccTxn::new(TimeStamp::new(2), cm);
txn.put_write(
Key::from_raw(b"a"),
TimeStamp::new(5),
Write::new(WriteType::Put, TimeStamp::new(2), None)
.as_ref()
.to_bytes(),
);
write(&engine, &Context::default(), txn.into_modifies());
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut reader = SnapshotReader::new(TimeStamp::new(3), snapshot, true);
struct Case {
expected: MvccResult<()>,
should_not_exist: bool,
write: Write,
write_commit_ts: TimeStamp,
key: Key,
}
let cases = vec![
Case {
expected: Ok(()),
should_not_exist: false,
write: Write::new(WriteType::Put, TimeStamp::new(3), None),
write_commit_ts: Default::default(),
key: Key::from_raw(b"a"),
},
Case {
expected: Ok(()),
should_not_exist: true,
write: Write::new(WriteType::Delete, TimeStamp::new(3), None),
write_commit_ts: Default::default(),
key: Key::from_raw(b"a"),
},
Case {
expected: Err(ErrorInner::AlreadyExist { key: b"a".to_vec() }.into()),
should_not_exist: true,
write: Write::new(WriteType::Put, TimeStamp::new(3), None),
write_commit_ts: Default::default(),
key: Key::from_raw(b"a"),
},
Case {
expected: Err(ErrorInner::AlreadyExist { key: b"a".to_vec() }.into()),
should_not_exist: true,
write: Write::new(WriteType::Rollback, TimeStamp::new(3), None),
write_commit_ts: TimeStamp::new(6),
key: Key::from_raw(b"a"),
},
Case {
expected: Err(ErrorInner::AlreadyExist { key: b"a".to_vec() }.into()),
should_not_exist: true,
write: Write::new(WriteType::Lock, TimeStamp::new(10), None),
write_commit_ts: TimeStamp::new(15),
key: Key::from_raw(b"a"),
},
];
for Case {
expected,
should_not_exist,
write,
write_commit_ts,
key,
} in cases
{
let result =
check_data_constraint(&mut reader, should_not_exist, &write, write_commit_ts, &key);
assert_eq!(format!("{:?}", expected), format!("{:?}", result));
}
}
}