1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use super::*;
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
use engine_rocks::PerfLevel;
use tidb_query_datatype::codec::Datum;
use tikv::config::CoprReadPoolConfig;
use tikv::coprocessor::{readpool_impl, Endpoint};
use tikv::read_pool::ReadPool;
use tikv::server::Config;
use tikv::storage::kv::RocksEngine;
use tikv::storage::{Engine, TestEngineBuilder};
#[derive(Clone)]
pub struct ProductTable(Table);
impl ProductTable {
pub fn new() -> ProductTable {
let id = ColumnBuilder::new()
.col_type(TYPE_LONG)
.primary_key(true)
.build();
let idx_id = next_id();
let name = ColumnBuilder::new()
.col_type(TYPE_VAR_CHAR)
.index_key(idx_id)
.build();
let count = ColumnBuilder::new()
.col_type(TYPE_LONG)
.index_key(idx_id)
.build();
let table = TableBuilder::new()
.add_col("id", id)
.add_col("name", name)
.add_col("count", count)
.build();
ProductTable(table)
}
}
impl std::ops::Deref for ProductTable {
type Target = Table;
fn deref(&self) -> &Table {
&self.0
}
}
pub fn init_data_with_engine_and_commit<E: Engine>(
ctx: Context,
engine: E,
tbl: &ProductTable,
vals: &[(i64, Option<&str>, i64)],
commit: bool,
) -> (Store<E>, Endpoint<E>) {
init_data_with_details(ctx, engine, tbl, vals, commit, &Config::default())
}
pub fn init_data_with_details<E: Engine>(
ctx: Context,
engine: E,
tbl: &ProductTable,
vals: &[(i64, Option<&str>, i64)],
commit: bool,
cfg: &Config,
) -> (Store<E>, Endpoint<E>) {
let mut store = Store::from_engine(engine);
store.begin();
for &(id, name, count) in vals {
store
.insert_into(&tbl)
.set(&tbl["id"], Datum::I64(id))
.set(&tbl["name"], name.map(str::as_bytes).into())
.set(&tbl["count"], Datum::I64(count))
.execute_with_ctx(ctx.clone());
}
if commit {
store.commit_with_ctx(ctx);
}
let pool = ReadPool::from(readpool_impl::build_read_pool_for_test(
&CoprReadPoolConfig::default_for_test(),
store.get_engine(),
));
let cm = ConcurrencyManager::new(1.into());
let copr = Endpoint::new(cfg, pool.handle(), cm, PerfLevel::EnableCount);
(store, copr)
}
pub fn init_data_with_commit(
tbl: &ProductTable,
vals: &[(i64, Option<&str>, i64)],
commit: bool,
) -> (Store<RocksEngine>, Endpoint<RocksEngine>) {
let engine = TestEngineBuilder::new().build().unwrap();
init_data_with_engine_and_commit(Context::default(), engine, tbl, vals, commit)
}
pub fn init_with_data(
tbl: &ProductTable,
vals: &[(i64, Option<&str>, i64)],
) -> (Store<RocksEngine>, Endpoint<RocksEngine>) {
init_data_with_commit(tbl, vals, true)
}