Transaction Handling Process

This article will introduce how transaction requests are handled in TiKV.

The urls in this article refers to the code which performs certain operation.

In a system which consists of TiDB and TiKV, the architecture looks like this:

architecture

Though client is not part of TiKV, it is also an important to read some code in it to understand how a request is handled.

There're many implements of client, and their process of sending a request is similiar, we'll take client-rust as an example here.

Basically, TiKV's transaction system is based on Google's Percolator, you are recommended to read some material about it before you start reading this.

Begin

You'll need a client object to start a transaction.

The code which creates a transaction is here, you can see the client includes a PdRpcClient, which is responsible for communicate with the pd component.

And then you can use Client::begin to start an transaction.


#![allow(unused)]
fn main() {
pub async fn begin(&self) -> Result<Transaction> {
	let timestamp = self.current_timestamp().await?;
	Ok(self.new_transaction(timestamp))
}
}

Firstly, we'll need to get a time stamp from pd, and then we'll create a new Transaction object by using current timestamp.

Single point read

We can use Transaction::get to get a single value for a certain key.

This part of code is here.

We'll try to read the local buffered key first. And if the local buffered key does not exist, a GetRequest will be sent to TiKV.

You may have known that TiKV divide all the data into different regions, and each replica of some certain region is on its own TiKV node, and pd will manage the meta infomation about where are the replicas for some certain key is.

The code above seems doesn't cover the steps which decide which TiKV node should we send the request to. But that's not the case. The code which do these jobs is hidden under execute, and you'll find the code which tries to get the TiKV node here , and it is called by retry_response_stream here:


#![allow(unused)]
fn main() {
fn store_for_key(
        self: Arc<Self>,
        key: &Key,
    ) -> BoxFuture<'static, Result<Store<Self::KvClient>>> {
        self.region_for_key(key)
            .and_then(move |region| self.map_region_to_store(region))
            .boxed()
    }
}

Firstly, it will use grpc call GetRegion in region_for_key to find out which region is the key in.

The remote fuction GetRegion it defined is here in pd.

And then we'll use grpc call GetStore in map_region_to_store to find out the leader of region.

The remote fuction GetStore it defined is here in pd.

Finally we'll get a KvRpcClient instance, which represents the connection to a TiKV replica.

Then let's back to retry_response_stream, next function call we should pay attention to is store.dispatch, it calls grpc function KvGet.

And finally we reach the code in TiKV's repo. In TiKV, the requests are handled by Server struct , and the KvGet will be handled by future_get here.

Firstly we'll read the value for a key by using Storage::get.

get function is a little bit long, we'll ignore STATIC parts for now, and we'll get:


#![allow(unused)]
fn main() {
pub fn get(&self, mut ctx: Context, key: Key,
    start_ts: TimeStamp) -> impl Future<Item = Option<Value>, Error = Error> {
    const CMD: CommandKind = CommandKind::get;
    let priority = ctx.get_priority();
    let priority_tag = get_priority_tag(priority);

    let res = self.read_pool.spawn_handle(
        async move {
            // The bypass_locks set will be checked at most once. `TsSet::vec` is more efficient
            // here.
            let bypass_locks = TsSet::vec_from_u64s(ctx.take_resolved_locks());
            let snapshot = Self::with_tls_engine(|engine| Self::snapshot(engine, &ctx)).await?;
            let snap_store = SnapshotStore::new(snapshot, start_ts,
                        ctx.get_isolation_level(),
                        !ctx.get_not_fill_cache(),
                        bypass_locks,
                        false);
            let result = snap_store.get(&key, &mut statistics)
                    // map storage::txn::Error -> storage::Error
                    .map_err(Error::from);
            result
        },
        priority,
        thread_rng().next_u64(),
    );
    res.map_err(|_| Error::from(ErrorInner::SchedTooBusy))
        .flatten()
}
}

This function will get a snapshot, and then construct a SnapshotStore by using the snapshot, and then call get on this SnapshotStore, and finally get the data we need.

The bypass_locks part is a tricky optimize related to large transaction, see this pr.

Then we'll view the code of SnapshotStore::get, you'll see that in fact it consturcted a PointGetter, and then call the get method on PointGetter:


#![allow(unused)]
fn main() {
pub fn get(&mut self, user_key: &Key) -> Result<Option<Value>> {
    if !self.multi {
        // Protect from calling `get()` multiple times when `multi == false`.
        if self.drained {
            return Ok(None);
        } else {
            self.drained = true;
        }
    }

    match self.isolation_level {
        IsolationLevel::Si => {
            // Check for locks that signal concurrent writes in Si.
            self.load_and_check_lock(user_key)?;
        }
        IsolationLevel::Rc => {}
    }

    self.load_data(user_key)
}
}

As we can see, if the required isolation_level is Si, we need to check whether there's any locks which may conflict with current get. If we find some, we'll return a KeyIsLocked error:


#![allow(unused)]
fn main() {
fn load_and_check_lock(&mut self, user_key: &Key) -> Result<()> {
    self.statistics.lock.get += 1;
    let lock_value = self.snapshot.get_cf(CF_LOCK, user_key)?;

    if let Some(ref lock_value) = lock_value {
        self.statistics.lock.processed += 1;
        let lock = Lock::parse(lock_value)?;
        if self.met_newer_ts_data == NewerTsCheckState::NotMetYet {
            self.met_newer_ts_data = NewerTsCheckState::Met;
        }
        lock.check_ts_conflict(user_key, self.ts, &self.bypass_locks)
            .map_err(Into::into)
    } else {
        Ok(())
    }
}
}

And then we'll use PointGetter's load_data method to load the value.

Now we have the value in GetResponse. But if the key is locked, client still need to resolve the locked keys. This will still be handled in retry_response_stream.

Resolve locks

First, the client will take the locks we met from the response, and then we'll use resolve_locks to try to resolve them:

We find all the locks which are expired, and resolve them one by one.

Then we'll get lock_version's corresponding commit_version (might be buffered), and use it to send cleanup_request.

It seems that using CleanupRequest directly is deprecated after 4.0 , then we'll simply igonre it.

And then it is the key point: resolve_lock_with_retry, this function will construct a ResolveLockRequest, and send it to TiKV to execute.

Let's turn to TiKV's source code, according to whether the key on the request is empty, ResolveLockRequest will be converted into ResolveLockReadPhase + ResolveLock or ResolveLockLite. The difference between those two is that ResolveLockLite will only handle the locks Request ask for resolve, while ResolveLock will resolve locks in a whole region.

The handling of ResolveLock has 2 parts: the read phase is here, which is resposible for read out the locks and construct the write phase command, and the write phase is here, which is responsible for the release work.

These two code part uses MvccTxn and MvccReader, we'll explain them later in another article.

Comments here gives a good intruduction of what ResolveLock do.

After all expired_locks are resolved, a new GetRequest is sent, and the get process will be done again until it success.

And then, the result value is returned. (Finally!)

Let's summerize the process with a dataflow diagram.

single-point-get-dfd

Scan

On the client side, scan is almost the same as single point get, except that it sends a KvScan grpc call instead of KvGet.

And on the TiKV side, things are a little different, firstly, the request will be handled by future_scan, and then Storage::scan,and finally we'll find out the function which really do the job is a Scanner, and we'll cover this part in another document.

Write

In fact, write just write to local buffer. All data modifications will be sent to TiKV on prewrite.

Commit

Now comes the most interesting part: commit, just like what I mentioned, commit in TiKV is based on Percolator, but there are several things that are different:

  • Percolator depends on BigTable's single row transaction, so we must implement something alike by ourselves in TiKV.

  • We need to support pessimistic transaction

    Pessimistic transaction enable TiDB to have a better MySQL compatibility, and save rollbacks under high load.

    But it introduces some other problems such as:

    • dead lock

      In optimistic transaction handling, dead lock won't happen because in the prewrite stage, a transaction would about if another transaction holds a lock it needs, and in the read stage, the locks from a dead transaction are resolved.

So let's see how TiKV deal with these things.

Client

From the client side, the commit process is easy, you can see we use a TwoPhaseCommitter to do the commit job, and what it does is just as the Percolator paper says: prewrite, commit_primary and finally commit_secondary.

AcquirePessimisticLock

This is used in the pessimistic transaction handling. It locks certain keys to prevent them from being changed by other transactions.

This one does not exists in client-rust for now, so you have to read TiDB's code here.

Basically, it sends a PessimisticLockRequest to TiKV, and TiKV will handle it here, it just run MvccTxn::acquire_pessimistic_lock for each key to lock, which just put a lock on the key, the lock is just like the lock used in prewrite in optimistic transaction, the only differece is its type is LockType::Pessimistic.

And the it returns whether the lock is successful. If not, it will also return the lock to wait for.

Prewrite

On TiKV side, the prewrite process happens here in process_write_impl.

The first few lines of code (if rows > FORWARD_MIN_MUTATIONS_NUM part) is not covered by the TiKV Source Code Reading blogs. I guess it means:

if there's no "write" record in [mutations.minKey, mutation.maxKey] {
	skip_constraint_check = true;
  scan_mode = Some(ScanMode::Forward)
}

As far as I understand, it just provides a optimized way of checking the "write" column, see tikv#5846 for details.

And no matter whether this branch is taken, we'll construct a MvccTxn , and then use it to do the prewrite job for each mutation the client sent to the TiKV server.

The MvccTxn::prewrite function just do what the Percolator describes: check the write record in [start_ts, ∞] to find a newer write (this can be bypassed if skip_constraint_check is set, we can ignore this check safely in situations like import data). And then check whether the current key is locked at any timestamp. And finally use prewrite_key_value to lock the key and write the value in.

Latches

Just as I mentioned, there's no such things like "single row transaction" in TiKV, so we need another way to prevent the key's locking state changed by another transaction during prewrite.

TiKV use Latches to archieve this, you can consider it as a Map from key('s hashcode) to mutexes. You can lock a key in the Latches to prevent it be used by other transactions.

The latches is used in try_to_wake_up , this is called before each command is executed, it will lock all the latches the commands used.

prewrite-dfd

PrewritePessimistic

PrewritePessimistic's handling is very similiar to Prewrite, except it:

  • doesn't need to read the write record for checking conflict, because the potential conflicts have already checked during acquiring the lock
  • downgrade the pessimistic lock to optimistic lock during prewrite, so the following commit process would be the same as the commit process in optmistic transaction handling
  • needs to prevent deadlock
Dead lock handling

There won't be a dead lock in the optimistic transaction handling process, because we can know all the keys to lock during the prewrite process, so we can lock them in order.

But during the pessimistic transaction handling process, the situation is very different: when to lock a key or which keys to lock are totally decided by the user, so for example:

transaction A:
	lock key a;
	do some process;
	lock key b;
	do some other process;
commit

and

transaction B:
	lock key b;
	do some process;
	lock key a;
	do some other process;
commit

If you are unlucky, transaction A will hold the lock on a and try to get the lock on b, and transaction B will hold the lock b and try to get the lock on a, and neither of them can get the lock and continue with their jobs, so a dead lock occurred.

TiKV use deadlock detection to prevent this kind of situation.

The deadlock detector is made up with two parts: the LockManager and the Detector.

Basically, these two make a Directed acyclic graph with the transactions and the locks it require, if adding a node may break the "acyclic" rule, then a potential deadlock is detected, a separate doc will be add to describe the LockManager.

(Do) Commit

After prewrite is done, the client will do the commit works: first commit the primary key, then the secondary ones, both these two kind of keys' commit are represented by the Commit command and handled here.

In the commit process we just use MvccTxn::commit to commit each key, which it does is much like Percolator describes,.

We also collect the released locks and use it to wake up the waiting pessimistic transactions.

Rollback

(Optimistic) Rollback

On the client side, rollback just construct a BatchRollbackRequest with the keys changed in this transaction and a start_version which identify the transaction to be rolled back, and send it to server.

On the server side, it just call MvccTxn::rollback here, and MvccTxn::rollback is a direct proxy to MvccTxn::cleanup.

Let's view the code in MvccTxn::cleanup:

The first branch in the match is taken when there's a lock on the key.

!current_ts.is_zero() is always false in the rollback situation, so we'll ignore it here.

Then we'll call MvccTxn::rollback_lock:

First remove the value written if necessary:


#![allow(unused)]
fn main() {
if lock.short_value.is_none() && lock.lock_type == LockType::Put {
	self.delete_value(key.clone(), lock.ts);
}
}

And then put the write record.


#![allow(unused)]
fn main() {
let protected: bool = is_pessimistic_txn && key.is_encoded_from(&lock.primary);
let write = Write::new_rollback(self.start_ts, protected);
self.put_write(key.clone(), self.start_ts, write.as_ref().to_bytes());
}

And then collapse the prev rollback if necessary:


#![allow(unused)]
fn main() {
if self.collapse_rollback {
	self.collapse_prev_rollback(key.clone())?;
}
}

Finally unlock the key:


#![allow(unused)]
fn main() {
Ok(self.unlock_key(key, is_pessimistic_txn))
}

On the other hand, in MvccTxn::cleanup, when there's no lock on the key, first we'll use check_txn_status_missing_lock to decide the status of the transaction, if the transaction has already committed, return an error, else it is ok.

Pessimistic Rollback

The only difference between the handling of PessimisticRollback and Rollback is PessimisticRollback use MvccTxn::pessimistic_rollback here.

And the only job MvccTxn::pessimistic_rollback is to remove the lock the transaction put on the key.

Summary

This article gives a brief introduction on how transactions are handled in TiKV, and contain links which shows us where are the code corresponding to some certain action.

This is just a high-level and brief introduction, we did not dive very deep into several parts of the code base, eg. the mvcc part, the scheduler. But I hope this article can give you a basic view of TiKV's transaction handling system and help you to get farmiliar of some of our code base.