Introduction

TiKV is a distributed, transactional key-value database. It has been widely adopted in many critical production environments — see the TiKV adopters. It has also been accepted by the Cloud Native Computing Foundation as a Sandbox project in August, 2018.

TiKV is fully ACID compliant and features automatic horizontal scalability, global data consistency, geo-replication, and many other features. It can be used as a building block for other high-level services. For example, we have already used TiKV to support TiDB - a next-generation HTAP database.

In this book, we will introduce everything about TiKV, including why we built it and how we continue to improve it, what problems we have met, what the core technologies are and why, etc. We hope that through this book, you can develop a deep understanding of TiKV, build your knowledge of distributed programming, or even get inspired to build your own distributed system. :-)

History

In the middle of 2015, we decided to build a database which solved MySQL's scaling problems. At that time, the most common way to increase MySQL's scalability was to build a proxy on top of MySQL that distributes the load more efficiently, but we don't think that's the best way.

As far as we knew, proxy-based solutions have following problems:

  • Building a proxy on top of the MySQL servers cannot guarantee ACID compliance. Notably, the cross-node transactions are not supported natively.
  • It poses great challenges for business flexibility because the users have to worry about the data distribution and design their sharding keys carefully to avoid inefficient queries.
  • The high availability and data consistency of MySQL can't be guaranteed easily based on the traditional Master-Slave replication.

Although building a proxy based on MySQL directly might be easy at the beginning, we still decided to chose another way, a more difficult path — to build a distributed, MySQL compatible database from scratch.

Fortunately, Google met the same problem and had already published some papers to describe how they built Spanner and F1 to solve it. Spanner is a globally distributed, externally consistent database and F1 is a distributed SQL database based on Spanner. Inspired by Spanner and F1, we knew we could do the same thing. So we started to build TiDB - a stateless MySQL layer like F1. After we released TiDB, we knew we needed an underlying Spanner-like database so we began to develop TiKV.

Architecture

Following is the architecture of TiKV:

Architecture

In this illustration there are three TiKV instances in the cluster and each instance uses one RocksDB to save data. On top of RocksDB, we use the Raft consensus algorithm to replicate the data. In practice we use at least three replicas to keep data safe and consistent, and these replicas form a Raft group.

We use the traditional MVCC mechanism and build a distributed transaction layer above the Raft layer. We also provide a Coprocessor framework so users can push down their computing logic to the storage layer.

All the network communications are through gRPC so that contributors can develop their own clients easily.

The whole cluster is managed and scheduled by a central service — Placement Driver(PD).

As you can see, the hierarchy of TiKV is clear and easy to understand, and we will give more detailed explanation later.

Consensus Algorithm

When building a distributed system one principal goal is often to build in fault-tolerance. That is, if one particular node in a network goes down, or if there is a network partition, the systems continues to operate. The cluster of nodes taking part in a distributed consensus protocol must come to agreement regarding values, and once that decision is reached, that choice is final, even if some nodes were in a faulty state at the time.

Distributed consensus algorithms often take the form of a replicated state machine and log. Each state machine accepts inputs from its log, and represents the value(s) to be replicated, for example, a change to a hash table. They allow a collection of machines to work as a coherent group that can survive the failures of some of its members.

Two well known distributed consensus algorithms are Paxos and Raft. Paxos is used in systems like Chubby by Google, and Raft is used in systems like TiKV or etcd. Raft is generally seen as a more understandable and simpler to implement than Paxos.

In TiKV we harness Raft for distributed consensus. We found it much easier to understand both the algorithm, and how it will behave in even truly perverse scenarios.

Consistency, Availability, & Partitioning

In 2000, Eric Brewer presented “Towards Robust Distributed Systems” which detailed the CAP Theorem. Succinctly, the theorem declares that a distributed system may only choose two of the following three attributes:

  • Consistency: Every read receives the most recent write or an error
  • Availability: Every request receives a (non-error) response – without necessarily considering the most recent write
  • Partitioning: The system continues to operate despite an arbitrary number of messages being dropped (or delayed) between nodes

Traditional RDBMS, like PostgreSQL, that provide ACID guarantees, favor consistency over availability. BASE (Basic Availability, Soft-state, Eventual consistency) systems, like MongoDB and other NoSQL systems, favor availability over consistency.

In 2012, Daniel Abadi proposed that CAP was not sufficient to describe the trade-offs which occur when choosing the attributes of a distributed system. They described an expanded PACELC Theorem:

Availability (A) and consistency (C) (as per the CAP theorem), but else (E), even when the system is running normally in the absence of partitions, one has to choose between latency (L) and consistency (C).

In order to support greater availability of data, most systems will replicate data between multiple peers. The system may also replicate a write ahead log offsite. In order to fulfill these availability guarantees the system must ensure a certain number of replications have occured before confirming an action. More replication means more consistency but also more latency.

Byzantine Failure

Consensus algorithms are typically either Byzantine Fault Tolerant, or not. Succinctly, systems which can withstand Byzantine faults are able to withstand misbehaving peers. Most distributed systems you would use inside of a VLAN, such as Kafka, TiKV, and etcd, are not Byzantine Fault Tolerant.

In order to withstand Byzantine faults, the system must tolerate peers:

  • actively spreading incorrect information,
  • deliberately not spreading correct information,
  • modifying information that would otherwise be correct.

This extends far beyond situations where the network link degrades and starts corrupting packets at the TCP layer. Those kinds of issues are easily tractable compared to a system being able to withstand active internal subversion.

In order to better understand Byzantine Fault Tolerance it helps to imagine the Byzantine Generals Problem:

Several Byzantine generals and their armies have surrounded an enemy army inside a deep forest. Separate, they are not strong enough to defeat the enemy, but if they attack in a coordinated fashion they will succeed. They must all agree on a time to attack the enemy.

In order to communicate, the generals can send messengers through the forest. These messages may or may not reach their destination. They could be kidnapped and replaced with imposters, converted to the enemy cause, or outright killed.

How can the generals confidently coordinate a time to attack?

After thinking on the problem for a time, you can conclude that tackling such a problem introduces a tremendous amount of complexity and overhead to a system.

Separating Byzantine tolerant systems from non-tolerant systems helps with evaluation of systems. A non-tolerant system will almost always outperform a tolerant system.

Paxos

Paxos is a protocol that Leslie Lamport and others have written extensively about. The most succinct paper describing Paxos is "Paxos Made Easy" published by Lamport in 2001. The original paper "The Part-Time Parliment" was published in 1989.

Paxos defines several roles, and each node in a cluster may perform in one or many roles. Each cluster has a single eventually chosen leader, and then some number of learners (which take action on the agreed upon request), Acceptors (which form quorums and act as "memory"), and proposers (which advocate for client requests and coordinate).

Unlike Raft, which represents a relatively concrete protocol, Paxos represents a family of protocols. Each variant has different tradeoffs.

A few variants of Paxos:

  • Basic Paxos: The basic protocol, allowing consensus about a single value.
  • Multi Paxos: Allow the protocol to handle a stream of messages with less overhead than Basic Paxos.
  • Cheap Paxos: Reduce number of nodes needed via dynamic reconfiguration in exchange for reduced burst fault tolerance.
  • Fast Paxos: Reduce the number of round trips in exchange for reduced fault tolerance.
  • Byzantine Paxos: Withstanding Byzantine failure scenarios.
  • Raft: Described in the next chapter.

It has been noted in the industry that Paxos is notoriously difficult to learn. Algorithms such as Raft are designed deliberately to be more easy to understand.

Due to complexities in the protocol, and the range of possibilities, it can be difficult to ascertain the state of a system when things go wrong (or are going right).

The basic Paxos algorithm itself does not define things like how to handle leader failures or membership changes.

Raft

In 2014, Diego Ongaro and John Ousterhout presented the Raft algorithm. It is explained succinctly in a paper and detailed at length in a thesis.

Raft defines a strong, single leader and number of followers in a group of peers. The group represents a replicated state machine. Only the leader may service client requests. The leader replicates actions to the followers.

Each peer has a durable write ahead log. All peers append each action as an entry in the log immediately as they recieve it. When the quorum (the majority) of peers have confirmed that that the entry exists in their log, the leader commits the log, each peer then can apply the action to their state machine.

Raft guarantees strong consistency by having only one ‘leader’ of the group which services all requests. All requests are then replicated to a quorum before being acted on, then confirmed with the requester. From the perspective of the cluster, the leader always has an up to date state machine.

The group is available when a majority of peers are able to coordinate. If the group is partitioned, only a partition containing the majority of the group can recover and resume servicing requests. If the cluster is split into three equal subgroups, for example, none of the subgroups will recover and service requests.

Raft supports leader elections. If the group leader fails, one of the followers will be elected the new leader. It’s not possible for a stale leader to be elected. If a leader candidate is aware of requests which the other peers of a particular subgroup are not, it will be elected over those peers. Since only the majority of peers can form a quorum this means that in order to be elected a peer must be up to date.

Because of how leader elections work, Raft is not Byzantine fault tolerant. Any node is able to lie and subvert the cluster by starting an election and claiming to have log entries it didn’t have.

It’s possible to support actions such as changing the peer group’s membership, or replicating log entries to non-voting followers (learners). In addition, several optimizations can be applied to Raft. Prevote can be used to introduce a pre-election by possible leaders, allowing them to gauge their ability to become a leader before potentially disrupting a cluster. Joint Consensus can support arbitrary group changes, allowing for better scaling. Batching and pipelining can help high throughput systems perform better.

Key-Value Engine

A key-value engine serves as the bottommost layer in a key-value database, unless you are going to build your own file system or operating system. A key-value engine is crucial for a database because it manages all the persistent data directly.

Most key-value engines provide some common interfaces like Get, Put and Delete. Some engines also allow you to iterate the key-values in order efficiently, and most will provide special features for added efficiency.

Choosing a key value engine is the first step to build a database. Here are some important things we need to consider:

  • The data structure. Different data structures are optimized for different workloads. Some are good for reads and some are good for writes, etc.
  • Maturity. We don't need a storage engine to be fancy but we want it to be reliable. Buggy engines ruin everything you build on top of them. We recommend using a battle-tested storage engine which has been adopted by a lot of users.
  • Performance. The performance of the storage engine limits the overall performance of the whole database. So make sure the storage engine meets your expectation and has the potential to improve along with the database.

In this chapter, we will do a comparison between two well-known data structures and guide you through the storage engine used in TiKV.

B-Tree vs Log-Structured Merge-Tree

The B-tree and the Log-Structured Merge-tree (LSM-tree) are the two most widely used data structures for data-intensive applications to organize and store data. However, each of them has its own advantages and disadvantages. This article aims to use quantitative approaches to compare these two data structures.

Metrics

In general, there are three critical metrics to measure the performance of a data structure: write amplification, read amplification, and space amplification. This section aims to describe these metrics.

For hard disk drives (HDDs), the cost of disk seek is enormous, such that the performance of random read/write is worse than that of sequential read/write. This article assumes that flash-based storage is used so we can ignore the cost of disk seeks.

Write Amplification

Write amplification is the ratio of the amount of data written to the storage device versus the amount of data written to the database.

For example, if you are writing 10 MB to the database and you observe 30 MB disk write rate, your write amplification is 3.

Flash-based storage can be written to only a finite number of times, so write amplification will decrease the flash lifetime.

There is another write amplification associated with flash memory and SSDs because flash memory must be erased before it can be rewritten.

Read Amplification

Read amplification is the number of disk reads per query.

For example, if you need to read 5 pages to answer a query, read amplification is 5.

Note that the units of write amplification and read amplification are different. Write amplification measures how much more data is written than the application thought it was writing, whereas read amplification counts the number of disk reads to perform a query.

Read amplification is defined separately for point query and range queries. For range queries the range length matters (the number of rows to be fetched).

Caching is a critical factor for read amplification. For example, with a B-tree in the cold-cache case, a point query requires \(O(log_BN)\) disk reads, whereas in the warm-cache case the internal nodes of the B-tree are cached, and so a B-tree requires at most one disk read per query.

Space Amplification

Space amplification is the ratio of the amount of data on the storage device versus the amount of data in the database.

For example, if you put 10MB in the database and this database uses 100MB on the disk, then the space amplification is 10.

Generally speaking, a data structure can optimize for at most two from read, write, and space amplification. This means one data structure is unlikely to be better than another at all three. For example a B-tree has less read amplification than an LSM-tree while an LSM-tree has less write amplification than a B-tree.

Analysis

The B-tree is a generalization of binary search tree in which a node can have more than two children. There are two kinds of node in a B-tree, internal nodes, and leaf nodes. A leaf node contains data records and has no children, whereas an internal node can have a variable number of child nodes within some pre-defined range. Internal nodes may be joined or split. An example of a B-tree appears in Figure 1.

Figure 1

Figure 1. The root node is shown at the top of the tree, and in this case happens to contain a single pivot (20), indicating that records with key k where k ≤ 20 are stored in the first child, and records with key k where k > 20 are stored in the second child. The first child contains two pivot keys (11 and 15), indicating that records with key k where k ≤ 11 is stored in the first child, those with 11 < k ≤ 15 are stored in the second child, and those with k > 15 are stored in the third child. The leftmost leaf node contains three values (3, 5, and 7).

The term B-tree may refer to a specific design or a general class of designs. In the narrow sense, a B-tree stores keys in its internal nodes but need not store those keys in the records at the leaves. The B+ tree is one of the most famous variations of B-tree. The idea behind the B+ tree is that internal nodes only contain keys, and an additional level which contains values is added at the bottom with linked leaves.

Like other search trees, an LSM-tree contains key-value pairs. It maintains data in two or more separate components (sometimes called SSTables), each of which is optimized for its respective underlying storage medium; the data in the low level component is efficiently merged with the data in the high level component in batches. An example of LSM-tree appears in Figure 2.

Figure 2

Figure 2. The LSM-tree contains \(k\) components. Data starts in \(C_0\), then gets merged into the \(C_1\). Eventually the \(C_1\) is merged into the \(C_2\), and so forth.

An LSM-tree periodically performs compaction to merge several SSTables into one new SSTable which contains only the live data from the input SSTables. Compaction helps the LSM-tree to recycle space and reduce read amplification. There are two kinds of compaction strategy: Size-tiered compaction strategy (STCS) and Level-based compaction strategy (LBCS). The idea behind STCS is to compact small SSTables into medium SSTables when the LSM-tree has enough small SSTables and compact medium SSTables into large SSTables when LSM-tree has enough medium SSTables. The idea of LBCS is to organize data into levels and each level contains one sorted run. Once a level accumulates enough data, some of the data at this level will be compacted to the higher level.

This section discusses the write amplification and read amplification of B+tree and Level-Based LSM-tree.

B+ Tree

In the B+ tree, copies of the keys are stored in the internal nodes; the keys and records are stored in leaves; in addition, a leaf node may include a pointer to the next leaf node to increase sequential access performance.

To simplify the analysis, assume that the block size of the tree is \(B\) measured in bytes, and keys, pointers, and records are constant size, so that each internal node contains \(O(B)\) children and each leaf contains \(O(B)\) data records. (The root node is a special case, and can be nearly empty in some situations.) Under all these assumptions, the depth of a B+ tree is $$ O(log_BN/B) $$ where \(N\) is the size of the database.

Write Amplification

For the worst-case insertion workloads, every insertion requires writing the leaf block containing the record, so the write amplification is \(B\).

Read Amplification

The number of disk reads per query is at most \(O(log_BN/B)\), which is the depth of the tree.

Level-Based LSM-tree

In the Level-based LSM-tree, data is organized into levels. Each level contains one sorted run. Data starts in level 0, then gets merged into the level 1 run. Eventually the level 1 run is merged into the level 2 run, and so forth. Each level is constrained in its sizes. Growth factor \(k\) is specified as the magnification of data size at each level.

$$ level_i = level_{i-1} * k $$ We can analyze the Level-based LSM-tree as follows. If the growth factor is \(k\) and the smallest level is a single file of size \(B\), then the number of levels is

$$ Θ(log_kN/B) $$ where \(N\) is the size of the database. In order to simplify the analysis, we assume that database size is stable and grows slowly over time, so that the size of database will be nearly equal to the size of last level.

Write Amplification

Data must be moved out of each level once, but data from a given level is merged repeatedly with data from the previous level. On average, after being first written into a level, each data item is remerged back into the same level about \(k/2\) times. So the total write amplification is $$ Θ(k*log_kN/B) $$

Read Amplification

To perform a short range query in the cold cache case, we must perform a binary search on each of the levels.

For the highest \(level_i\), the data size is \(O(N)\), so that it performs \(O(logN/B)\) disk reads.

For the previous \(level_{i-1}\), the data size is \(O(N/k)\), so that it performs \(O(log(N/(kB))\) disk reads.

For \(level_{i-2}\), the data size is \(O(N/k^2)\), so that it performs \(O(log(N/k^2B)\) disk reads.

For \(level_{i-n}\), the data size is \(O(N/k^n)\), so that it performs \(O(log(N/k^nB)\) disk reads.

So that the total number of disk reads is

$$ R = O(logN/B) + O(log(N/(kB)) + O(log(N/k^2B) + ... + O(log(N/k^nB) + 1 = O((log^2N/B)/logk) $$

Summary

The following table shows the summary of various kinds of amplification:

Data Structure Write Amplification Read Amplification
B+ tree \(Θ(B)\) \(O(log_BN/B)\)
Level-Based LSM-tree \(Θ(klog_kN/B)\) \(Θ((log^2N/B)/logk)\)

Table 1. A summary of the write amplification and read amplification for range queries.

Through comparing various kinds of amplification between B+ tree and Level-based LSM-tree, we can come to a conclusion that Level-based LSM-tree has a better write performance than B+ tree while its read performance is not as good as B+ tree. The main purpose for TiKV to use LSM-tree instead of B-tree as its underlying storage engine is because using cache technology to promote read performance is much easier than promote write performance.

RocksDB

RocksDB is a persistent key-value store for fast storage environment. Here are some highlight features from RocksDB:

  1. RocksDB uses a log structured database engine, written entirely in C++, for maximum performance. Keys and values are just arbitrarily-sized byte streams.
  2. RocksDB is optimized for fast, low latency storage such as flash drives and high-speed disk drives. RocksDB exploits the full potential of high read/write rates offered by flash or RAM.
  3. RocksDB is adaptable to different workloads. From database storage engines such as MyRocks to application data caching to embedded workloads, RocksDB can be used for a variety of data needs.
  4. RocksDB provides basic operations such as opening and closing a database, reading and writing to more advanced operations such as merging and compaction filters.

TiKV uses RocksDB because RocksDB is mature and high-performance. In this section, we will explore how TiKV uses RocksDB. We won't talk about basic features like Get, Put, Delete, and Iterate here because their usage is simple and clear and works well too. Instead, we'll focus some special features used in TiKV below.

Prefix Bloom Filter

A Bloom Filter is a magical data structure that uses a little resource but helps a lot. We won't explain the whole algorithm here. If you are not familiar with Bloom Filters, you can think it as a black box inside a dataset, which can tell you if a key probably exists or definitely does not without actually searching the dataset. Sometimes Bloom Filter gives you a false-positive answer although it rarely happens.

TiKV uses a Bloom Filter as well as a variant which is called Prefix Bloom Filter (PBF). Instead of telling you if a whole key exists in a dataset or not, PBF tells you if there are some other keys with the same prefix exists. Since PBF only stores the unique prefixes instead of all unique whole keys, it can save some memory too with the down side of having larger false positive rate.

TiKV supports MVCC, which means that there can be multiple versions for the same row stored in RocksDB. All versions of the same row share the same prefix (the row key) but have different timestamps as a suffix. When we want to read a row, we usually don't know about the exact version to read, but only want to read the latest version at a specific timestamp. This is where PBF shines. PBF can filter out data which is impossible to contain keys with the same prefix as the row key we provided. Then we just need to search in the data that may contain different versions of the row key and locate the specific version we want.

TableProperties

RocksDB allows us to register some table properties collectors. When RocksDB builds an SST file, it passes the sorted key-values one by one to the callback of each collector so that we can collect whatever we want. Then when the SST file is finished, the collected properties will be stored inside the SST file too.

We use this feature to optimize two functionalities.

The first one is for Split Check. Split check is a worker to check if regions are large enough to split. We have to scan all the data within a region to calculate the size of the region at the original implementation, which is resource consuming. With the TableProperties feature, we record the size of small sub-ranges in each SST file so that we can calculate the approximate size of a region from the table properties without scanning any data at all.

Another one is for MVCC Garbage Collection (GC). GC is a process to clean up garbage versions (versions older than the configured lifetime) of each row. If we have no idea whether a region contains some garbage versions or not, we have to check all regions periodically. To skip unnecessary garbage collection, we record some MVCC statistics (e.g. the number of rows and the number of versions) in each SST file. So before checking every region row by row, we check the table properties to see if it is necessary to do garbage collection on a region.

CompactRange

From time to time, some regions may contain a lot of tombstone entries because of GC or other delete operations. Tombstone entries are not good for scan performance and waste disk space as well.

So with the TableProperties feature, we can check every region periodically to see if it contains a lot of tombstones. If it does, we will compact the region range manually to drop tombstone entries and release disk space.

We also use CompactRange to recover RocksDB from some mistakes like incompatible table properties across different TiKV versions.

EventListener

EventListener allows us to listen to some special events, like flush, compaction or write stall condition change. When a specific event is triggered or finished, RocksDB will invoke our callbacks with some information about the event.

TiKV listens to the compaction event to observe the region size changes. As mentioned above, we calculate the approximate size of each region from the table properties. The approximate size will be recorded in the memory so that we don't need to calculate it again and again if nothing has changed. However, during compactions, some entries are dropped so the approximate size of some regions should be updated. That's why we listen to the compaction events and recalculate the approximate size of some regions when necessary.

IngestExternalFile

RocksDB allows us to generate an SST file outside and then ingest the file into RocksDB directly. This feature can potentially save a lot of IO because RocksDB is smart enough to ingest a file to the bottom level if possible, which can reduce write amplification because the ingested file doesn't need to be compacted again and again.

We use this feature to handle Raft snapshot. For example, when we want to add a replica to a new server. We can first generate a snapshot file from another server and then send the file to the new server. Then the new server can ingest that file into its RocksDB directly, which saves lots of work!

We also use this feature to import a huge amount of data into TiKV. We have some tools to generate sorted SST files from different data sources and then ingest those files into different TiKV servers. This is super fast compared to writing key-values to a TiKV cluster in the usual way.

DeleteFilesInRange

Previously, TiKV used the straightforward way to delete a range of data, which is scanning all the keys in the range and then delete them one by one. However, disk space would not release until the tombstones have been compacted. Even worse, disk space usage will actually increase temporarily because of newly written tombstones.

As time goes on, users store more and more data in TiKV until their disk space is insufficient. Then users will try to drop some tables or add more stores and expect the disk space usage to decrease in a short time. But TiKV didn't meet expectations with this method. We first tried to solve this by using the DeleteRange feature in RocksDB. However, DeleteRange turns out to be unstable and can not release disk space fast enough.

A faster way to release disk space is to delete some files directly, which leads us to the DeleteFilesInRange feature. But this feature is not perfect, it is quite dangerous because it breaks the snapshot consistency. If you acquire a snapshot from RocksDB, use DeleteFilesInRange to delete some files, then try to read that data you will find that some of it is missing. So we should use this feature carefully.

TiKV uses DeleteFilesInRange to destroy tombstone regions and GC dropped tables. Both cases have a prerequisite that the dropped range must not be accessed anymore.

Introduction

As TiKV is a distributed transactional key-value database, transaction is a core feature of TiKV. In this chapter we will talk about general implementations of distributed transaction and some implementation details in TiKV.

A database transaction, by definition, must be atomic, consistent, isolated and durable. Database practitioners often refer to these properties of database transactions using the acronym ACID.

Transactions provide an "all-or-nothing" proposition: each work-unit performed in a database must either complete in its entirety or have no effect whatsoever. Furthermore, the system must isolate each transaction from other transactions, results must conform to existing constraints in the database, and transactions that complete successfully must get written to durable storage.

A distributed transaction is a database transaction in which two or more network hosts are involved. Usually, hosts provide transactional resources, while the transaction manager is responsible for creating and managing a global transaction that encompasses all operations against such resources. Distributed transactions, as any other transactions, must have all four ACID properties.

A common algorithm for ensuring correct completion of a distributed transaction is the two-phase commit (2PC).

TiKV adopts Google's Percolator transaction model, a variant of 2PC.

Isolation Level

Isolation is one of the ACID (Atomicity, Consistency, Isolation, Durability) properties. It determines how transaction integrity is visible to other users and systems. For example, when a user is creating a Purchase Order and has created the header, but not the Purchase Order lines, is the header available for other systems/users (carrying out concurrent operations, such as a report on Purchase Orders) to see?

A lower isolation level increases the ability of many users to access the same data at the same time, but increases the number of concurrency effects (such as dirty reads or lost updates) users might encounter. Conversely, a higher isolation level reduces the types of concurrency effects that users may encounter, but requires more system resources and increases the chances that one transaction will block another.

Most DBMSs offer a number of transaction isolation levels, which control the degree of locking that occurs when selecting data. For many database applications, the majority of database transactions can be constructed to avoid requiring high isolation levels (e.g. SERIALIZABLE level), thus reducing the locking overhead for the system. The programmer must carefully analyze database access code to ensure that any relaxation of isolation does not cause software bugs that are difficult to find. Conversely, if higher isolation levels are used, the possibility of deadlock is increased, which also requires careful analysis and programming techniques to avoid.

Since each isolation level is stronger than those below, in that no higher isolation level allows an action forbidden by a lower one, the standard permits a DBMS to run a transaction at an isolation level stronger than that requested (e.g., a "Read committed" transaction may actually be performed at a "Repeatable read" isolation level).

The isolation levels defined by the ANSI/ISO SQL standard are listed as follows.

Serializable

This is the highest isolation level.

With a lock-based concurrency control DBMS implementation, serializability requires read and write locks (acquired on selected data) to be released at the end of the transaction. Also range-locks must be acquired when a SELECT query uses a ranged WHERE clause, especially to avoid the phantom reads phenomenon.

When using non-lock based concurrency control, no locks are acquired; however, if the system detects a write collision among several concurrent transactions, only one of them is allowed to commit. See snapshot isolation for more details on this topic.

Per the SQL-92 standard:

The execution of concurrent SQL-transactions at isolation level SERIALIZABLE is guaranteed to be serializable. A serializable execution is defined to be an execution of the operations of concurrently executing SQL-transactions that produces the same effect as some serial execution of those same SQL-transactions. A serial execution is one in which each SQL-transaction executes to completion before the next SQL-transaction begins.

Repeatable Read

In this isolation level, a lock-based concurrency control DBMS implementation keeps read and write locks (acquired on selected data) until the end of the transaction. However, range-locks are not managed, so phantom reads can occur.

Write skew is possible at this isolation level, a phenomenon where two writes are allowed to the same column(s) in a table by two different writers (who have previously read the columns they are updating), resulting in the column having data that is a mix of the two transactions.

Repeatable reads is the default isolation level for MySQL's InnoDB engine.

Read Committed

In this isolation level, a lock-based concurrency control DBMS implementation keeps write locks (acquired on selected data) until the end of the transaction, but read locks are released as soon as the SELECT operation is performed (so the non-repeatable reads phenomenon can occur in this isolation level). As in the previous level, range-locks are not managed.

Putting it in simpler words, read committed is an isolation level that guarantees that any data read is committed at the moment it is read. It simply restricts the reader from seeing any intermediate, uncommitted, 'dirty' read. It makes no promise whatsoever that if the transaction re-issues the read, it will find the same data; data is free to change after it is read.

Read Uncommitted

This is the lowest isolation level. In this level, dirty reads are allowed, so one transaction may see not-yet-committed changes made by other transactions.

Snapshot Isolation

We mentioned 4 different isolation levels above, but TiDB doesn’t adopt any of them. Instead, TiDB uses snapshot isolation as its default ioslation level. The main reason for it is that it allows better serializability, yet still avoids most of the concurrency anomalies that serializability avoids (but not always all).

TiDB is not alone: snapshot isolation also has been adopted by major database management systems such as InterBase, Firebird, Oracle, MySQL, PostgreSQL, SQL Anywhere, MongoDB and Microsoft SQL Server (2005 and later).

Snapshot isolation is a guarantee that all reads made in a transaction will see a consistent snapshot of the database, and the transaction itself will successfully commit only if no updates it has made conflict with any concurrent updates made since that snapshot.

In practice snapshot isolation is implemented within multiversion concurrency control (MVCC), where generational values of each data item (versions) are maintained: MVCC is a common way to increase concurrency and performance by generating a new version of a database object each time the object is written, and allowing transactions' read operations of several last relevant versions (of each object). The prevalence of snapshot isolation has been seen as a refutation of the ANSI SQL-92 standard’s definition of isolation levels, as it exhibits none of the "anomalies" that the SQL standard prohibited, yet is not serializable (the anomaly-free isolation level defined by ANSI).

A transaction executing under snapshot isolation appears to operate on a personal snapshot of the database, taken at the start of the transaction. When the transaction concludes, it will successfully commit only if the values updated by the transaction have not been changed externally since the snapshot was taken. Such a write-write conflict will cause the transaction to abort.

In a write skew anomaly, two transactions (T1 and T2) concurrently read an overlapping data set (e.g. values V1 and V2), concurrently make disjoint updates (e.g. T1 updates V1, T2 updates V2), and finally concurrently commit, neither having seen the update performed by the other. Were the system serializable, such an anomaly would be impossible, as either T1 or T2 would have to occur "first", and be visible to the other. In contrast, snapshot isolation permits write skew anomalies.

As a concrete example, imagine V1 and V2 are two balances held by a single person, James. The bank will allow either V1 or V2 to run a deficit, provided the total held in both is never negative (i.e. V1 + V2 ≥ 0). Both balances are currently $100. James initiates two transactions concurrently, T1 withdrawing $200 from V1, and T2 withdrawing $200 from V2.

If the database guaranteed serializable transactions, the simplest way of coding T1 is to deduct $200 from V1, and then verify that V1 + V2 ≥ 0 still holds, aborting if not. T2 similarly deducts $200 from V2 and then verifies V1 + V2 ≥ 0. Since the transactions must serialize, either T1 happens first, leaving V1 = -$100, V2 = $100, and preventing T2 from succeeding (since V1 + (V2 - $200) would be -$200), or T2 happens first and similarly prevents T1 from committing.

If the database is under snapshot isolation (MVCC), however, T1 and T2 operate on private snapshots of the database: each deducts $200 from an account, and then verifies that the new total is zero, using the other account value that held when the snapshot was taken. Since neither update conflicts, both commit successfully, leaving V1 = V2 = -$100, and V1 + V2 = -$200.

In TiDB, you can use SELECT … FOR UPDATE statement to avoid write skew anomaly. In this case, TiDB will use locks to serialize writes together with MVCC to gain some of the performance gains and still support the stronger "serializability" level of isolation.

Distributed Algorithms

Two-Phase Commit

In transaction processing, databases, and computer networking, the two-phase commit protocol (2PC) is a type of atomic commitment protocol (ACP). It is a distributed algorithm that coordinates all the processes that participate in a distributed atomic transaction, determining whether to commit or abort (rollback) the transaction. It is a specialized type of consensus protocol. The protocol achieves its goal even in many cases of temporary system failure (involving either process, network node, communication, etc. failures), and is thus widely used. However, it is not resilient to all possible failure scenarios, and in rare cases user (i.e. a system's administrator) intervention is needed to resolve failures. To aide in recovery from failure the protocol's participants log the protocol's states. Log records, which are typically slow to generate but survive failures, are used by the protocol's recovery procedures. Many protocol variants exist that primarily differ in logging strategies and recovery mechanisms. Though expected to be used infrequently, recovery procedures compose a substantial portion of the protocol, due to many possible failure scenarios to be considered and supported by the protocol.

Basic Algorithm of 2PC

prepare phase

The coordinator sends a prepare message to all cohorts and waits until it has received a reply from all cohorts.

commit phase

If the coordinator received an agreement message from all cohorts during the prepare phase, the coordinator sends a commit message to all the cohorts.

If any cohort votes No during the prepare phase (or the coordinator's timeout expires), the coordinator sends a rollback message to all the cohorts.

Disadvantages of 2PC

The greatest disadvantage of the two-phase commit protocol is that it is a blocking protocol. If the coordinator fails permanently, some cohorts will never resolve their transactions: after a cohort has sent an agreement message to the coordinator, it will block until a commit or rollback is received.

For example, consider a transaction involving a coordinator A and the cohort C1. If C1 receives a prepare message and responds to A, then A fails before sending C1 either a commit or rollback message, then C1 will block forever.

2PC Practice in TiKV

In TiKV we adopt the Percolator transaction model which is a variant of two phase commit. To address the disadvantage of coordinator failures, percolator doesn't use any node as coordinator, instead it uses one of the keys involved in each transaction as a coordinator. We call the coordinating key the primary key, and the other keys secondary keys. Since each key has multiple replicas, and data is kept consistent between these replicas by using a consensus protocol (Raft in TiKV), one node's failure doesn't affect the accessibility of data. So Percolator can tolerate node fails permanently.

Three-Phase Commit

Unlike the two-phase commit protocol (2PC), 3PC is non-blocking. Specifically, 3PC places an upper bound on the amount of time required before a transaction either commits or aborts. This property ensures that if a given transaction is attempting to commit via 3PC and holds some resource locks, it will release the locks after the timeout.

1st phase

The coordinator receives a transaction request. If there is a failure at this point, the coordinator aborts the transaction. Otherwise, the coordinator sends a canCommit? message to the cohorts and moves to the waiting state.

2nd phase

If there is a failure, timeout, or if the coordinator receives a No message in the waiting state, the coordinator aborts the transaction and sends an abort message to all cohorts. Otherwise the coordinator will receive Yes messages from all cohorts within the time window, so it sends preCommit messages to all cohorts and moves to the prepared state.

3rd phase

If the coordinator succeeds in the prepared state, it will move to the commit state. However if the coordinator times out while waiting for an acknowledgement from a cohort, it will abort the transaction. In the case where an acknowledgement is received from the majority of cohorts, the coordinator moves to the commit state as well.

A two-phase commit protocol cannot dependably recover from a failure of both the coordinator and a cohort member during the Commit phase. If only the coordinator had failed, and no cohort members had received a commit message, it could safely be inferred that no commit had happened. If, however, both the coordinator and a cohort member failed, it is possible that the failed cohort member was the first to be notified, and had actually done the commit. Even if a new coordinator is selected, it cannot confidently proceed with the operation until it has received an agreement from all cohort members, and hence must block until all cohort members respond.

The three-phase commit protocol eliminates this problem by introducing the Prepared-to-commit state. If the coordinator fails before sending preCommit messages, the cohort will unanimously agree that the operation was aborted. The coordinator will not send out a doCommit message until all cohort members have acknowledged that they are Prepared-to-commit. This eliminates the possibility that any cohort member actually completed the transaction before all cohort members were aware of the decision to do so (an ambiguity that necessitated indefinite blocking in the two-phase commit protocol).

Disadvantages of 3PC

The main disadvantage to this algorithm is that it cannot recover in the event the network is segmented in any manner. The original 3PC algorithm assumes a fail-stop model, where processes fail by crashing and crashes can be accurately detected, and does not work with network partitions or asynchronous communication.

The protocol requires at least three round trips to complete. This potentially causes a long latency in order to complete each transaction.

Paxos Commit

The Paxos Commit algorithm runs a Paxos consensus algorithm on the commit/abort decision of each participant to achieve a transaction commit protocol that uses 2F + 1 coordinators and makes progress if at least F + 1 of them are working properly. Paxos Commit has the same stable-storage write delay, and can be implemented to have the same message delay in the fault-free case, as Two-Phase Commit, but it uses more messages. The classic Two-Phase Commit algorithm is obtained as the special F = 0 case of the Paxos Commit algorithm.

In the Two-Phase Commit protocol, the coordinator decides whether to abort or commit, records that decision in stable storage, and informs the cohorts of its decision. We could make that fault-tolerant by simply using a consensus algorithm to choose the committed/aborted decision, letting the cohorts be the client that proposes the consensus value. This approach was apparently first proposed by Mohan, Strong, and Finkelstein, who used a synchronous consensus protocol. However, in the normal case, the leader must learn that each cohort has prepared before it can try to get the value committed chosen. Having the cohorts tell the leader that they have prepared requires at least one message delay.

Pessimistic & Optimistic Locking

To prevent lost updates and dirty reads, locking is employed to manage the actions of multiple concurrent users on a database. The two types of locking are pessimistic locking and optimistic locking.

Pessimistic Locking

A user who reads a record with the intention of updating it places an exclusive lock on the record to prevent other users from manipulating it. This means no one else can manipulate that record until the user releases the lock. The downside is that users can be locked out for a very long time, thereby slowing the overall system response and causing frustration.

Pessimistic locking is mainly used in environments where write contention is heavy, where the cost of protecting data through locks is less than the cost of rolling back transactions if concurrency conflicts occur. Pessimistic concurrency is best implemented when lock times will be short, as in programmatic processing of records. Pessimistic concurrency requires a persistent connection to the database and is not a scalable option when users are interacting with data, because records might be locked for relatively large periods of time. It is not appropriate for use in web application development.

Optimistic Locking

This allows multiple concurrent users access to the database whilst the system keeps a copy of the initial-read made by each user. When a user wants to update a record, the application first determines whether another user has changed the record since it was last read. The application does this by comparing the initial-read held in memory to the database record to verify any changes made to the record. Any discrepancies between the initial-read and the database record violates concurrency rules and causes the system to disregard any update request; an error message is reported and the user must start the update process again. It improves database performance by reducing the amount of locking required, thereby reducing the load on the database server. It works efficiently with tables that require limited updates since no users are locked out. However, some updates may fail. The downside is constant update failures due to high volumes of update requests from multiple concurrent users - it can be frustrating for users.

Optimistic locking is appropriate in environments where there is low write contention for data, or where read-only access to data is required.

Practice in TiKV

We use the Percolator Transaction model in TiKV, which uses an optimistic locking strategy. It reads database values, writes them tentatively, then checks whether other transactions have modified data that this transaction has used (read or written). This includes transactions that completed after this transaction's start time, and optionally, transactions that are still active at validation time. If there is no conflict, all changes take effect. If there is a conflict it is resolved, typically by aborting the transaction. In TiKV, the whole transaction is restarted with a new timestamp.

Timestamp Oracle

The timestamp oracle plays a significant role in the Percolator Transaction model, it is a server that hands out timestamps in strictly increasing order, a property required for correct operation of the snapshot isolation protocol.

Since every transaction requires contacting the timestamp oracle twice, this service must scale well. The timestamp oracle periodically allocates a range of timestamps by writing the highest allocated timestamp to stable storage; then with that allocated range of timestamps, it can satisfy future requests strictly from memory. If the timestamp oracle restarts, the timestamps will jump forward to the maximum allocated timestamp. Timestamps never go "backwards".

To save RPC overhead (at the cost of increasing transaction latency) each timestamp requester batches timestamp requests across transactions by maintaining only one pending RPC to the oracle. As the oracle becomes more loaded, the batching naturally increases to compensate. Batching increases the scalability of the oracle but does not affect the timestamp guarantees.

The transaction protocol uses strictly increasing timestamps to guarantee that Get() returns all committed writes before the transaction’s start timestamp. To see how it provides this guarantee, consider a transaction R reading at timestamp TR and a transaction W that committed at timestamp TW < TR; we will show that R sees W’s writes. Since TW < TR, we know that the timestamp oracle gave out TW before or in the same batch as TR; hence, W requested TW before R received TR. We know that R can’t do reads before receiving its start timestamp TR and that W wrote locks before requesting its commit timestamp TW . Therefore, the above property guarantees that W must have at least written all its locks before R did any reads; R’s Get() will see either the fully committed write record or the lock, in which case W will block until the lock is released. Either way, W’s write is visible to R’s Get().

In our system, the timestamp oracle has been embeded into Placement Driver (PD). PD is the management component with a "God view" and is responsible for storing metadata and conducting load balancing.

Practice in TiKV

We use batching and preallocating techniques to increase the timestamp oracle's throughput, and also we use a Raft group to tolerate node failure, but there are still some disadvantages to allocating timestamps from a single node. One disadvantage is that the timestamp oracle can't be scaled to multiple nodes. Another is that when the current Raft leader fails, there is a gap wherein the system cannot allocate a timestamp before a new leader has been elected. Finally, when the timestamp requestor is located at a remote datacenter the requestor must tolerate the high latency caused by the network round trip. There are some potential solutions for this final case, such as Google Spanner’s TrueTime mechanism and HLCs (Hybrid Logical Clocks).

Percolator

TiKV supports distributed transactions, which is inspired by Google's Percolator. In this section, we will briefly introduce Percolator and how we make use of it in TiKV.

What is Percolator?

Percolator is a system built by Google for incremental processing on a very large data set. Since this is just a brief introduction, you can view the full paper here for more details. If you are already very familiar with it, you can skip this section and go directly to Percolator in TiKV

Percolator is built based on Google's BigTable, a distributed storage system that supports single-row transactions. Percolator implements distributed transactions in ACID snapshot-isolation semantics, which is not supported by BigTable. A column c of Percolator is actually divided into the following internal columns of BigTable:

  • c:lock
  • c:write
  • c:data
  • c:notify
  • c:ack_O

Percolator also relies on a service named timestamp oracle. The timestamp oracle can produce timestamps in a strictly increasing order. All read and write operations need to apply for timestamps from the timestamp oracle, and a timestamp coming from the timestamp oracle will be used as the time when the read/write operation happens.

Percolator is a multi-version storage, and a data item's version is represented by the timestamp when the transaction was committed.

For example,

key v:data v:lock v"write
k1 14:"value2"
12:
10:"value1"
14:primary
12:
10:
14:
12:data@10
10:

The table shows different versions of data for a single cell. The state shown in the table means that for key k1, value "value1" was committed at timestamp 12. Then there is an uncommitted version whose value is "value2", and it's uncommitted because there's a lock. You will understand why it is like this after understanding how transactions work.

The remaining columns, c:notify and c:ack_O, are used for Percolator's incremental processing. After a modification, c:notify column is used to mark the modified cell to be dirty. Users can add some observers to Percolator which can do user-specified operations when they find data of their observed columns has changed. To find whether data is changed, the observers continuously scan the notify columns to find dirty cells. c:ack_O is the "acknowledgment" column of observer O, which is used to prevent a row from being incorrectly notified twice. It saves the timestamp of the observer's last execution.

Writing

Percolator's transactions are committed by a 2-phase commit (2PC) algorithm. Its two phases are Prewrite and Commit.

In Prewrite phase:

  1. Get a timestamp from the timestamp oracle, and we call the timestamp start_ts of the transaction.
  2. For each row involved in the transaction, put a lock in the lock column and write the value to the data column with the timestamp start_ts. One of these locks will be chosen as the primary lock, while others are secondary locks. Each lock contains the transaction's start_ts. Each secondary lock, in addition, contains the location of the primary lock.
    • If there's already a lock or newer version than start_ts, the current transaction will be rolled back because of write conflict.

And then, in theCommit phase:

  1. Get another timestamp, namely commit_ts.
  2. Remove the primary lock, and at the same time write a record to the write column with timestamp commit_ts, whose value records the transaction's start_ts. If the primary lock is missing, the commit fails.
  3. Repeat the process above for all secondary locks.

Once step 2 (committing the primary) is done, the whole transaction is done. It doesn't matter if the process of committing the secondary locks failed.

Let's see the example from the paper of Percolator. Assume we are writing two rows in a single transaction. At first, the data looks like this:

key bal:data bal:lock bal:write
Bob 6:
5:$10
6:
5:
6:data@5
5:
Joe 6:
5:$2
6:
5:
6:data@5
5:

This table shows Bob and Joe's balance. Now Bob wants to transfer his $7 to Joe's account. The first step is Prewrite:

  1. Get the start_ts of the transaction. In our example, it's 7.
  2. For each row involved in this transaction, put a lock in the lock column, and write the data to the data column. One of the locks will be chosen as the primary lock.

After Prewrite, our data looks like this:

key bal:data bal:lock bal:write
Bob 7:$3
6:
5:$10
7:primary
6:
5:
7:
6:data@5
5:
Joe 7:$9
6:
5:$2
7:primary@Bob.bal
6:
5:
7:
6:data@5
5:

Then Commit:

  1. Get the commit_ts, in our case, 8.
  2. Commit the primary: Remove the primary lock and write the commit record to the write column.
key bal:data bal:lock bal:write
Bob 8:
7:$3
6:
5:$10
8:
7:
6:
5:
8:data@7
7:
6:data@5
5:
Joe 7:$9
6:
5:$2
7:primary@Bob.bal
6:
5:
7:
6:data@5
5:
  1. Commit all secondary locks to complete the writing process.
key bal:data bal:lock bal:write
Bob 8:
7:$3
6:
5:$10
8:
7:
6:
5:
8:data@7
7:
6:data@5
5:
Joe 8:
7:$9
6:
5:$2
8:
7:
6:
5:
8:data@7
7:
6:data@5
5:

Reading

Reading from Percolator also requires a timestamp. The procedure to perform a read operation is as follows:

  1. Get a timestamp ts.
  2. Check if the row we are going to read is locked with a timestamp in the range [0, ts].
    • If there is a lock with the timestamp in range [0, ts], it means the row was locked by an earlier-started transaction. Then we are not sure whether that transaction will be committed before or after ts. In this case the reading will backoff and try again then.
    • If there is no lock or the lock's timestamp is greater than ts, the read can continue.
  3. Get the latest record in the row's write column whose commit_ts is in range [0, ts]. The record contains the start_ts of the transaction when it was committed.
  4. Get the row's value in the data column whose timestamp is exactly start_ts. Then the value is what we want.

For example, consider this table again:

key bal:data bal:lock bal:write
Bob 8:
7:$3
6:
5:$10
8:
7:
6:
5:
8:data@7
7:
6:data@5
5:
Joe 7:$9
6:
5:$2
7:primary@Bob.bal
6:
5:
7:
6:data@5
5:

Let's read Bob's balance.

  1. Get a timestamp. Assume it's 9.
  2. Check the lock of the row. The row of Bob is not locked, so we continue.
  3. Get the latest record in the write column committed before 9. We get a record with commit_ts equals to 8, andstart_ts 7, which means, its corresponding data is at timestamp 7 in the data column.
  4. Get the value in the data column with timestamp 7. $3 is the result to the read.

This algorithm provides us with the abilities of both lock-free read and historical read. In the above example, if we specify that we want to read at time point 7, then we will see the write record at timestamp 6, giving us the result $10 at timestamp 5.

Handling Conflicts

Conflicts are identified by checking the lock column. A row can have many versions of data, but it can have at most one lock at any time.

When we are performing a write operation, we try to lock every affected row in the Prewrite phase. If we failed to lock some of these rows, the whole transaction will be rolled back. Using an optimistic lock algorithm, sometimes Percolator's transactional write may encounter performance regressions in scenarios where conflicts occur frequently.

To roll back a row, just simply remove its lock and its corresponding value in data column.

Tolerating crashes

Percolator has the ability to survive crashes without breaking data integrity.

First, let's see what will happen after a crash. A crash may happen during Prewrite, Commit or between these two phases. We can simply divide these conditions into two types: before committing the primary, or after committing the primary.

So, when a transaction T1 (either reading or writing) finds that a row R1 has a lock which belongs to an earlier transaction T0, T1 doesn't simply rollback itself immediately. Instead, it checks the state of T0's primary lock.

  • If the primary lock has disappeared and there's a record data @ T0.start_ts in the write column, it means that T0 has been successfully committed. Then row R1's stale lock can also be committed. Usually we call this rolling forward. After this, the new transaction T1 resumes.
  • If the primary lock has disappeared with nothing left, it means the transaction has been rolled back. Then row R1's stale lock should also be rolled back. After this, T1 resumes.
  • If the primary lock exists but it's too old (we can determine this by saving the wall time to locks), it indicates that the transaction has crashed before being committed or rolled back. Roll back T1 and it will resume.
  • Otherwise, we consider transaction T0 to be still running. T1 can rollback itself, or try to wait for a while to see whether T0 will be committed before T1.start_ts.

Percolator in TiKV

TiKV is a distributed transactional key-value storage engine. Each key-value pair can be regarded as a row in Percolator.

TiKV internally uses RocksDB, a key-value storage engine library, to persist data to local disk. RocksDB's atomic write batch and TiKV's transaction scheduler make it atomic to read and write a single user key, which is a requirement of Percolator.

RocksDB provides a feature named Column Family (hereafter referred to as CF). An instance of RocksDB may have multiple CFs, and each CF is a separated key namespace and has its own LSM-Tree. However different CFs in the same RocksDB instance uses a common WAL, providing the ability to write to different CFs atomically.

We divide a RocksDB instance to three CFs: CF_DEFAULT, CF_LOCK and CF_WRITE, which corresponds to Percolator's data column, lock column and write column respectively. There's an extra CF named CF_RAFT which is used to save some metadata of Raft, but that's beside our topic. The notify and ack_O columns are not present in TiKV, because for now TiKV doesn't need the ability of incremental processing.

Then, we need to represent different versions of a key. We can simply compound a key and a timestamp as an internal key, which can be used in RocksDB. However, since a key can have at most one lock at a time, we don't need to add a timestamp to the key in CF_LOCK. Hence the content of each CF:

  • CF_DEFAULT: (key, start_ts) -> value
  • CF_LOCK: key -> lock_info
  • CF_WRITE: (key, commit_ts) -> write_info

Our approach to compound user keys and timestamps together is:

  1. Encode the user key to memcomparable
  2. Bitwise invert the timestamp (an unsigned int64) and encode it into big-endian bytes.
  3. Append the encoded timestamp to the encoded key.

For example, key "key1" and timestamp 3 will be encoded as "key1\x00\x00\x00\x00\xfb\xff\xff\xff\xff\xff\xff\xff\xfe", where the first 9 bytes is the memcomparable-encoded key and the remaining 8 bytes is the inverted timestamp in big-endian. In this way, different versions of the same key are always adjacent in RocksDB; and for each key, newer versions are always before older ones.

There are some differences between TiKV and the Percolator's paper. In TiKV, records in CF_WRITE has four different types: Put, Delete, Rollback and Lock. Only Put records need a corresponding value in CF_DEFAULT. When rolling back transactions, we don't simply remove the lock but writes a Rollback record in CF_WRITE. Different from Percolator's lock, the Lock type of write records in TiKV is produced by queries like SELECT ... FOR UPDATE in TiDB. For keys affected by this query, they are not only the objects for read, but the reading is also part of a write operation. To guarantee to be in snapshot-isolation, we make it acts like a write operation (though it doesn't write anything) to ensure the keys are locked and won't change before committing the transaction.

Introduction

In the database field, scalability is the term we use to describe the capability of a system to handle a growing amount of work. Even if a system is working reliably and fast today, it doesn't mean it will necessarily work well in the future. One common reason for degradation is the increased load which exceeds what the system can process. In modern systems, the amount of data we handle can far outgrow our original expectations, so scalability is a critical consideration for the design of a database.

A system whose performance improves after adding hardware, proportionally to the capacity added, is said to be a scalable system. TiKV is a highly scalable key-value store, especially comparing with other stand-alone key-value stores like RocksDB and LevelDB. In this chapter we will talk about the two main ways of scaling, horizontal and vertical, and how TiKV provide strong scalability based on Raft.

Horizontal or Vertical

Methods of adding more resources for a particular application fall into two broad categories: horizontal and vertical scaling.

Horizontal Scaling

Horizontal scaling, which is also known as scaling out, means adding more machines to a system and distributing the load across multiple smaller machines. As computer prices have dropped and performance continues to increase, high-performance computing applications have adopted low-cost commodity systems for tasks. System architects may configure hundreds of small computers in a cluster to obtain aggregate computing power that often exceeds that of computers based on a single traditional processor. The development of high-performance interconnects such as Gigabit Ethernet, InfiniBand further fueled this model.

Vertical Scaling

Vertical scaling, which is also known as scaling up, means adding resources to a single node in a system, typically involving the addition of CPUs or memory to a more powerful single computer. Such vertical scaling of existing systems also enables them to use virtualization technology more effectively, as it provides more resources for the hosted set of the operating system and application modules to share.

Tradeoff

There are tradeoffs between the above two models. Larger numbers of computers mean increased management complexity, as well as a more complex programming model and issues such as throughput and latency between nodes. A light workload running on scaled-out systems maybe is even slower than on a single machine due to communication overhead. But the problem with a scale-up approach is that the performance doesn't grow in linearly proportional to cost. A system that runs on a single machine is often simpler, but high-end machines can become very expensive, so most intensive workloads cannot avoid scaling out.

Data Sharding

What is the partition

For fault tolerance, TiKV replicates data to multiple nodes via the Raft consensus algorithm. For large datasets or high query throughput, managing all the data by one Raft state machine is not efficient and flexible, so we need to break the data up into partitions, also known as sharding. In TiKV, a partition is called a Region.

With partitioning, we can move partitions to balance the load among all nodes of a cluster. How to decide which partitions to store on which nodes isn't covered here, it will be discussed later in a chapter related to PD.

Partitioning

Different partitions can be placed on different nodes, and thus a large dataset can be distributed across many disks, and the query load can be distributed across many processors. But partitioning suffers from the issue of hot spots, where high load may end up on one partition but leave the other nodes idle. To avoid that, we can simply assign records to nodes randomly, but when reading a particular item there is no fast way of knowing which node is on.

There are two general ways to map keys to partitions:

Partitioning by hash of key

One way of partitioning is to assign a continuous range of hashes of keys to each partition. The partition boundaries can be evenly spaced, or they can be chosen by some kind of algorithm like consistent hashing. Due to the hash, keys can be distributed fairly among the partitions, effectively reducing hot spots, though not totally avoiding them. Unfortunately, this method fails to do efficient range queries, compared to partitioning by key range.

Partitioning by key range

The other way of partitioning is to assign a continuous range of keys to each partition. It needs to record the boundaries between the ranges to determine which partition contains a given key, and then requests can be sent to the appropriate node. One key feature of key range partition is that it is friendly to range scans. However, the downside of the sorted order is that sequential reads and writes can lead to hot spots.

Partitioning by range is the approach that TiKV uses. The main reason to choose it is scan-friendliness, meanwhile, for the split or merge of Regions, TiKV only needs to change meta-information about the range of the Region, which avoids moving actual data in a large extent.

Multi-raft

If you've researched Consensus before, please note that comparing Multi-Raft to Raft is not at all like comparing Multi-Paxos to Paxos. Here Multi-Raft only means we manage multiple Raft consensus groups on one node. From the above section, we know that there are multiple different partitions on each node, if there is only one Raft group for each node, the partitions losing its meaning. So Raft group is divided into multiple Raft groups in terms of partitions, namely, Region.

TiKV also can perform split or merge on Regions to make the partitions more flexible. When the size of a Region exceeds the limit, it will be divided into two or more Regions, and the range may change like \( [a, c) \) -> \( [a, b) \) + \( [b, c) \); when the sizes of two sibling Regions are small enough, they will be merged into a bigger Region, and the range may change like \( [a, b) \) + \( [b, c) \) -> \( [a, c) \).

multi-raft

For each Raft group, the process of the algorithm is still as before, and we only introduce a layer on top of Raft to manage these Raft consensus groups as a whole.

TiKV uses an event loop to drive all the processes in a batch manner. It polls all the Raft groups to drive the Raft state machine every 1000ms and accepts the requests from the outside clients and Raft messages sent by other nodes through the notification mechanism of the event loop.

For each event loop tick, it handles the Raft ready of each Raft group:

  1. It traverses all the ready groups and uses a RocksDB's WriteBatch to handle all appending data and persist the corresponding result at the same time.
  2. If WriteBatch succeeds, it then sends messages of every Raft group to corresponding Followers. TiKV reuses the connection between two nodes for multiple Raft groups.
  3. Then it applies any committed entries and executes them.

Introduction

In a distributed database environment, resource scheduling needs to meet the following requirements:

  • Keeping data highly available: The scheduler needs to be able to manage data redundancy to keep the cluster available when some nodes fail.
  • Balance server load: The scheduler needs to balance the load to prevent a single node from becoming a performance bottleneck for the entire system.
  • Scalability: The scheduler needs to be able to scale to thousands of nodes.
  • Fault tolerance: The scheduling process must not be stopped by the breaking down caused by a single node failure.

In the TiKV cluster, resource scheduling is done by the Placement Driver (PD). In this chapter, we will first introduce the design of two scheduling systems (Kubernetes and Mesos), followed by the design and implementation of scheduler and placement in PD.

Scheduler of Kubernetes

Overview

Kubernetes is a Docker-based open source container cluster management system initiated and maintained by the Google team. It supports not only common cloud platforms but also internal data centers.

Kubernetes built a container scheduling service which is designed to allow users to manage cloud container clusters through Kubernetes clusters without the need for complex setup tasks. The system will automatically select the appropriate working node to perform specific container cluster scheduling processing.

The scheduler needs to take into account individual and collective resource requirements, quality of service requirements, hardware/software/policy constraints, affinity and anti-affinity specifications, data locality, inter-workload interference, deadlines, and so on.

Scheduling process

The scheduling process is mainly divided into 2 steps. In the predicate step, the scheduler filters out nodes that do not satisfy required conditions. And in the priority step, the scheduler sorts the nodes that meet all of the fit predicates, and then chooses the best one.

Predicate stage

The scheduler provides some predicates algorithms by default. For instance, the HostNamePred predicate checks if the hostname matches the requested hostname; the PodsFitsResourcePred checks if a node has sufficient resources, such as CPU, memory, GPU, opaque int resources and so on, to run a pod. Relevant code can be found in kubernetes/pkg/scheduler/algorithm/predicates/.

Priority stage

In the priority step, the scheduler uses the PrioritizeNodes function to rank all nodes by calling each priority functions sequentially:

  • Each priority function is expected to set a score of 0-10 where 0 is the lowest priority score (least preferred node) and 10 is the highest.
  • Add all (weighted) scores for each node to get a total score.
  • Select the node with the highest score.

References

  1. kube-scheduler documentation
  2. Kubernetes introduction (in Chinese)
  3. How does Kubernetes' scheduler work
  4. Kubernetes Scheduling (in Chinese)

Mesos

Overview

Mesos was originally launched by UC Berkeley's AMPLab in 2009. It is lisenced under Apache and now operated by Mesosphere, Inc.

Mesos can abstract and schedule the resources of the entire data center (including CPU, memory, storage, network, etc.). This allows multiple applications to run in a cluster at the same time without needing to care about the physical distribution of resources.

Mesos has many compelling features, including:

  • Support for large scale scenarios with tens of thousands of nodes (adopted by Apple, Twitter, eBay, etc.)
  • Support for multiple application frameworks, including Marathon, Singularity, Aurora, etc.
  • High Availability (relies on ZooKeeper)
  • Support for Docker, LXC and other container techniques
  • Providing APIs for several popular languages, including Python, Java, C++, etc.
  • A simple and easy-to-use WebUI

Architecture

It is important to notice that Mesos itself is only a resource scheduling framework. It is not a complete application management platform, so Mesos can't work only on its own. However, based on Mesos, it is relatively easy to provide distributed operation capabilities for various application management frameworks or middleware platforms. Multiple frameworks can also run in a single Mesos cluster at the same time, improving overall resource utilization efficiency.

Figure 1

Components

Mesos consists of a master process that manages slave daemons running on each cluster node, and frameworks that run tasks on these slaves.

  • Mesos master

    The master sees the global information, and is responsible for resource scheduling and logical control between different frameworks. The frameworks need to be registered to master in order to be used. It uses Zookeeper to achieve HA.

  • Mesos salve

    The slave is responsible for reporting the resource status (idle resources, running status, etc.) on the slave node to master, and is responsible for isolating the local resources to perform the specific tasks assigned by master.

  • Frameworks

    Each framework consists of two components: a scheduler that registers with the master to be offered resources, and an executor process that is launched on slave nodes to run the framework’s tasks.

Resource scheduling

To support the sophisticated schedulers of today's frameworks, Mesos introduces a distributed two-level scheduling mechanism called resource offers.

Each resource offer is a list of free resources (for example, <1Core CPU, 2GB RAM>) on multiple slaves. While the master decides how many resources to offer to each framework according to an organizational policy, the frameworks’ schedulers select which of the offered resources to use. When a framework accepts offered resources, it passes Mesos a description of the tasks it wants to launch on them.

Figure 2

The figure shows an example of how resource scheduling works:

  1. Slave 1 reports to the master that it has 4 CPUs and 4 GB of memory free. The master then invokes the allocation policy module, which tells it that framework 1 should be offered all available resources.
  2. The master sends a resource offer describing what is available on slave 1 to framework 1.
  3. The framework’s scheduler replies to the master with information about two tasks to run on the slave, using <2 CPUs, 1 GB RAM> for the first task, and <1 CPUs, 2 GB RAM> for the second task.
  4. Finally, the master sends the tasks to the slave, which allocates appropriate resources to the framework’s executor, which in turn launches the two tasks (depicted with dotted-line borders in the figure). Because 1 CPU and 1 GB of RAM are still unallocated, the allocation module may now offer them to framework 2.

To maintain a thin interface and enable frameworks to evolve independently, Mesos does not require frameworks to specify their resource requirements or constraints. Instead, Mesos gives frameworks the ability to reject offers. A framework can reject resources that do not satisfy its constraints in order to wait for ones that do. Thus, the rejection mechanism enables frameworks to support arbitrarily complex resource constraints while keeping Mesos simple and scalable.

References

  1. Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center
  2. Mesos introduction (in Chinese)

Distributed SQL over TiKV

TiKV is the storage layer for TiDB, a distributed HTAP SQL database. So far, we have only explained how a distributed transactional Key-Value database is implemented. However this is still far from serving a SQL database. We will explore and cover the following things in this chapter:

  • Storage

    In this section we will see how the TiDB relational structure (i.e. SQL table records and indexes) are encoded into the Key-Value form in the latest version. We will also explore a new Key-Value format that is going to be implemented soon and some insights on even better Key-Value formats in future.

  • Distributed SQL (DistSQL)

    Storing data in a distributed manner using TiKV only utilizes distributed I/O resources, while the TiDB node that receives SQL query is still in charge of processing all rows. We can go a step further by delegating some processing tasks into TiKV nodes. This way, we can utilize distributed CPU resources! In this section, we will take a look at these supported physical plan executors so far in TiKV and see how they enable TiDB executing SQL queries in a distributed way.

  • TiKV Query Execution Engine

    When talking about executors we cannot ignore discussing the execution engine. Although executors running on TiKV are highly simplified and limited, we still need to carefully design the execution engine. It is critical to the performance of the system. This section will cover the traditional Volcano model execution engine used before TiKV 3.0, for example, how it works, pros and cons, and the architecture.

  • Vectorization

    Vectorization is a technique that performs computing over a batch of values. By introducing vectorization into the execution engine, we will achieve higher performance. This section will introduce its theory and the architecture of the vectorized execution engine introduced in TiKV 3.0.

Store

Distributed SQL

By now we already know how TiDB's relational structure is encoded into the Key-Value form with version. In this section, we will focus on the following questions:

  • What happens when TiDB receives a SQL query?
  • How does TiDB execute SQL queries in a distributed way?

What happens when TiDB receives a SQL query?

Firstly, let's have a look at the following example:

select count(*) from t where a + b  > 5;

Figure 1

As described in the above figure, when TiDB receives a SQL query from the client, it will process with the following steps:

  1. TiDB receives a new SQL from the client.
  2. TiDB prepares the processing plans for this request, meanwhile TiDB gets a timestamp from PD as the start_ts of this transaction.
  3. TiDB tries to get the information schema (metadata of the table) from TiKV.
  4. TiDB prepares Regions for each related key according to the information schema and the SQL query. Then TiDB gets information for the related Regions from PD.
  5. TiDB groups the related keys by Region.
  6. TiDB dispatches the tasks to the related TiKV concurrently.
  7. TiDB reassembles the data and returns the data to the client.

How does TiDB execute SQL queries in a distributed way?

In short, TiDB splits the task by Regions and sends them to TiKV concurrently.

For the above example, we assume the rows with the primary key of table t are distributed in three Regions:

  • Rows with the primary key in [0,100) are in Region 1.
  • Rows with primary key in [100,1000) are in region 2.
  • Rows with primary key in [1000,~) are in region 3.

Now we can do count and sum the result from the above three Regions.

Figure 2

Exectors

Now we know TiDB splits a read task by Regions, but how does TiKV know what are its tasks to handle? Here TiDB will send a Directed Acyclic Graph (DAG) to TiKV with each node as an executor.

Figure 3

Supported executors:

  • TableScan: Scans the rows with the primary key from the KV store.
  • IndexScan: It will scan the index data from the KV store.
  • Selection: performs a filter (mostly for where). The input is TableScan or IndexScan.
  • Aggregation: performs an aggregation (e.g. count(*),sum(xxx)). The input is TableScan,IndexScan, orSelection.
  • TopN: sorts the data and returns the top n matches, for example, order by xxx limit 10. The input is TableScan,IndexScan, orSelection.

executors-example

For the above example, we have the following executors on Region 1:

  • Aggregation: count(*).
  • Selection: a + b > 5
  • TableScan: range:[0,100).

Expression

We have executors as nodes in the DAG, but how do we describe columns, constants, and functions in an Aggregation or a Selection? There are three types of expressions:

  • Column: a column in the table.
  • Constant: a constant, which could be a string, int, duration, and so on.
  • Scalar function: describes a function.

Figure 4

For the above example select count(*) from t where a + b > 5, we have:

  • Column: a, b.
  • Scalar functions: +,>.
  • Constant: 5.