Rocksdb事務(wù)隔離性指的是多線程并發(fā)事務(wù)使用時(shí)候,事務(wù)與事務(wù)之間的隔離性,通過加鎖機(jī)制來實(shí)現(xiàn),本文重點(diǎn)剖析Read Commited隔離級別下,Rocksdb的加鎖機(jī)制。
- Rocksdb事務(wù)相關(guān)類族
Rocksdb的事務(wù)相關(guān)的類圖如下圖所示。主要有兩個(gè)類族,Transaction和DB,默認(rèn)采用PessimisticTransaction,而PessimisticTransaction內(nèi)部的加鎖機(jī)制通過TransactionLockMgr來實(shí)現(xiàn)的。
TransactionLockMgr內(nèi)部維護(hù)了LockMap。TransactionLockMgr根據(jù)每個(gè)記錄的Key計(jì)算hash值,再對num_stripes取模,在LockMap中的向量Std::vector<LockMapStripe>定位LockMapStripe,這樣減少實(shí)體鎖的競爭激烈程度,相當(dāng)于鎖分解。
LockMap的數(shù)據(jù)成員如下
Size_t num_stripes LockMapStripe個(gè)數(shù),默認(rèn)16個(gè)
Std::vector<LockMapStripe> LockMapStripe數(shù)組
LockMapStripe的數(shù)據(jù)成員如下
std::shared_ptr<TransactionDBMutex> stripe_mutex : 實(shí)體鎖
std::shared_ptr<TransactionDBCondVar> stripe_cv : 實(shí)體條件變量
std::unordered_map<std::string, LockInfo> keys : 具有相同Key hash值的每條記錄的加鎖信息,std::string為記錄的Key值。
LockInfo的數(shù)據(jù)成員如下
bool exclusive : 排它鎖,還是共享鎖
uint64_t expiration_time : 鎖的過期時(shí)間
autovector<TransactionID> txn_ids : 這把鎖阻塞的事務(wù)ID列表
2. Rocksdb事務(wù)流程分析
上述流程,是應(yīng)用創(chuàng)建TransactionDB,然后Put一條記錄,再Commit的協(xié)作流程圖,在Put階段調(diào)用TransactionLockMgr的TryLock方法,Commit階段調(diào)用TransactionLockMgr的UnLock方法。
TransactionLockMgr::TryLock內(nèi)部的主要邏輯在AcquireLocked函數(shù)中,TransactionLockMgr::UnLock內(nèi)部的主要邏輯在UnlockKey函數(shù)中,下面具體分析這兩個(gè)函數(shù)。綠色部分字體為個(gè)人注解。
AcquireLocked
Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
LockMapStripe* stripe,
const std:: string & key, //記錄的Key值
Env* env,
LockInfo&& txn_lock_info, //當(dāng)前事務(wù)鎖信息
uint64_t * expire_time, //鎖的過期時(shí)間
autovector<TransactionID>* txn_ids)
{
Status result;
auto stripe_iter = stripe->keys. find (key); // 檢查這條記錄的Key是否已經(jīng)被加鎖了。
if (stripe_iter != stripe->keys. end ()) { // 這條記錄的Key已經(jīng)被之前事務(wù)加過鎖
LockInfo& lock_info = stripe_iter-> second ;
if (lock_info.exclusive || txn_lock_info.exclusive) { //之前事務(wù)或者當(dāng)前事務(wù)加的是排他鎖,
if (lock_info.txn_ids.size() == 1 &&
lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) { //之前加鎖的事務(wù)就是當(dāng)前事務(wù)
lock_info.exclusive = txn_lock_info.exclusive;
lock_info.expiration_time = txn_lock_info.expiration_time;
} else { //之前加鎖的事務(wù)不是當(dāng)前事務(wù)
if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,
expire_time)) { // 之前事務(wù)加的鎖已經(jīng)過期,可以清除
lock_info.txn_ids = txn_lock_info.txn_ids;
lock_info.exclusive = txn_lock_info.exclusive;
lock_info.expiration_time = txn_lock_info.expiration_time;
} else {
result = Status::TimedOut(Status::SubCode::kLockTimeout);
*txn_ids = lock_info.txn_ids; // 返回之前事務(wù)列表
}
}
} else { //當(dāng)前事務(wù)加的是共享鎖
lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);
lock_info.expiration_time =
std:: max (lock_info.expiration_time, txn_lock_info.expiration_time);
}
} else { // 這條記錄的Key沒有被之前事務(wù)加過鎖
if (max_num_locks_ > 0 &&
lock_map->lock_cnt. load (std:: memory_order_acquire ) >= max_num_locks_) {
result = Status::Busy(Status::SubCode::kLockLimit);
} else {
// 當(dāng)前事務(wù)執(zhí)行加鎖操作
stripe->keys. emplace (key, std:: move (txn_lock_info));
if (max_num_locks_) {
lock_map->lock_cnt++;
}
}
}
return result;
}
UnlockKey邏輯相對簡單一些,主要是刪除加鎖的記錄,并且喚醒被阻塞的事務(wù)。
void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,
const std:: string & key,
LockMapStripe* stripe, LockMap* lock_map,
Env* env) {
TransactionID txn_id = txn->GetID();
auto stripe_iter = stripe->keys. find (key);
if (stripe_iter != stripe->keys. end ()) {
auto& txns = stripe_iter-> second .txn_ids;
auto txn_it = std:: find (txns. begin (), txns. end (), txn_id);
// Found the key we locked. unlock it.
if (txn_it != txns. end ()) {
if (txns. size () == 1) {
stripe->keys. erase (stripe_iter);
} else {
auto last_it = txns. end () - 1;
if (txn_it != last_it) {
*txn_it = *last_it;
}
txns.pop_back();
}
if (max_num_locks_ > 0) {
// Maintain lock count if there is a limit on the number of locks.
assert(lock_map->lock_cnt. load (std:: memory_order_relaxed ) > 0);
lock_map->lock_cnt--;
}
}
} else {
// This key is either not locked or locked by someone else. This should
// only hAppen if the unlocking transaction has expired.
assert(txn->GetExpirationTime() > 0 &&
txn->GetExpirationTime() < env->NowMicros());
}
}






