一、什么是分布式鎖
不同的進程需要以互斥的方式來訪問共享資源,這里實現互斥就是分布式鎖。
簡單來說就是:同一時間只有一個客戶端對共享資源操作。舉個實際例子,搶購茅臺,如果不加鎖就會發(fā)生超賣的事故。
二、實現分布式鎖需要注意的點
- 互斥性:在任何時刻,只有一個客戶端獲得鎖。
- 無死鎖:任何時候都能獲取鎖,即使客戶端崩潰或者或被分區(qū)。
- 正確性:解鈴還須系鈴人,客戶端 A 加的鎖只能由客戶端 A 解鎖,其他客戶端不能解鎖。
- 容錯:只要大部分 redis 節(jié)點處于運行狀態(tài),客戶端就能夠獲取和釋放鎖。
三、Redis 分布式鎖原理
Redis 加鎖主要是使用 set (
https://redis.io/commands/set ) 命令操作:
SET key value [EX seconds|PX milliseconds|KEEPTTL][NX|XX] [GET]
- EX -- 設置指定的過期時間,以秒為單位。
- PX -- 設置指定的過期時間,以毫秒為單位。
- NX -- 僅當該鍵不存在的時才會設置該鍵。
- XX -- 僅當該鍵存在時才會設置該鍵。
加鎖命令: SET lock_key lock_value PX 10000 NX
只有當 lock_key 不存在時才會設置 lock_key 和 lock_value,超時時間 10000 毫秒,設置成功返回 OK:
當 lock_key 存在時返回 nil:
Redis 釋放鎖使用命令: DEL key (
https://redis.io/commands/del )
解鎖命令: DEL lock_key 。
Redis 在 2.6.12 之后的版本才加入 [EX seconds|PX milliseconds] [NX|XX] 這些參數
在此之前使用 SETNX (
https://redis.io/commands/setnx ) SETNX 是 “ SET if N ot e X ists” 的縮寫。
SETNX 返回 1 說明設置成功, 返回 0 說明設置失敗。
SETNX 和 EXPIRE 操作之間不是原子性的,如果 SETNX 執(zhí)行成功之后, 沒有執(zhí)行 EXPIRE 命令,就可能會發(fā)生死鎖。
Redis 官網聲明 SETNX 在將來的版本中可能會被棄用,因為 SETNX 實現的功能 set 都能實現。
四、Redis 實現分布式鎖注意的點及解決方案
- 防死鎖
設置鎖和設置鎖的超時時間要保持原子性,這點很容易做到 使用 SET lock_key lock_value PX 10000 NX 命令即可, 不要使用 SETNX lock_key lock_value , EXPIRE lock_key 10 這些命令,因為他們之間不是原子性的,有發(fā)生死鎖的風險。
- 合理設置鎖超時時間
鎖的超時時間要大于程序執(zhí)行的時間,否則多個客戶端可能同時獲取鎖。充分預估使用鎖的業(yè)務代碼執(zhí)行時間,該時間不宜過長也不宜過短,過短,可能使鎖發(fā)生錯誤;過長,客戶端異常時可能會影響執(zhí)行效率。
- 釋放鎖要及時
客戶端使用完共享資源之后要及時的釋放鎖,即使在程序發(fā)生異常,JAVA 中一般都是在 finally 里釋放鎖。
- 只能釋放自己加的鎖
在釋放鎖的時要確保這個鎖是自己的,不能將其他鎖釋放掉,這樣可能導致多個客戶端同時獲取鎖??梢酝ㄟ^判斷 lock_value 的值是否相等來判斷是否是自己加的鎖,lock_value 的值可以使用 UUID 或者任意確定唯一的值。
- 釋放鎖要保證原子性
客戶端在釋放鎖時分兩個步驟,一要比較鎖的值是否相等,二要刪除鎖( DEL key ),這兩個步驟要保證原子性,否則的話可能導致將其他鎖釋放掉,畫個圖解釋下:
- 客戶端 A 設置 lock_order 鎖成功,鎖值為 123uD,超時間為 10000ms。
- 客戶端 A 業(yè)務代碼執(zhí)行完成,釋放鎖前需要獲取 lock_order 鎖的值。
- 客戶端 A 判斷鎖值是否是 123uD,執(zhí)行緩慢。
- 客戶端 A 的鎖超時時間已到,Redis 自動移除了鎖。
- 此時客戶端 B 設置鎖,lock_order 鎖不存在,所以加鎖成功。
- 客戶端 A 判斷鎖值相等,執(zhí)行 del 釋放鎖,此時客戶端 A 釋放的鎖是客戶端 B 的而不是自己的,鎖出現錯誤。
這也好解決,Redis 提供了 EVAL (
https://redis.io/commands/eval ) 命令去解析 Lua 腳本,可以發(fā)一段 Lua 腳本給 Redis 執(zhí)行:
if redis.call("get",KEYS[1]) == ARGV[1] -- 判斷鎖的值是否相等。 KEYS[1], ARGV[1],是指傳入的參數,以上面為例,KEYS[1] 指的是 lock_order,ARGV[1] 指的是 123uD,
then
return redis.call("del",KEYS[1]) -- 刪除這個 key,返回刪除 key 的個數
else
return 0 -- 鎖值不相等返回 0
end
復制代碼
這樣就可以保證原子執(zhí)行了。
五、基于Set命令實現 Redis 分布式鎖
基于 Redisson 客戶端實現 Redis 分布式鎖:
/**
* 加鎖利用 set(key, value, "PX", "NX") 函數實現
* 解鎖利用 Lua 腳本實現
* <p>
* Created by jie.li on 2021/1/4 7:50 下午
*/
@Component
public class RedisLock1 {
@Resource
private RedissonClient redissonClient;
/**
* 嘗試加鎖
*
* @param name lock name
* @param value lock value
* @return true 加鎖成功, false 加鎖失敗
*/
public boolean tryLock(String name, String value) {
RBucket<Object> bucket = redissonClient.getBucket(name);
// 執(zhí)行的是 set(key, value, "PX", "NX") 命令
return bucket.trySet(value, 10000, TimeUnit.MILLISECONDS);
}
/**
* 解鎖
*
* @param name lock name
* @param value lock value
*/
public void unLock(String name, String value) {
redissonClient.getScript().eval(RScript.Mode.READ_WRITE, DEL_LOCK_SCRIPT, RScript.ReturnType.INTEGER, Collections.singletonList(name), value);
}
// 解鎖腳本
private static final String DEL_LOCK_SCRIPT =
"if redis.call("get",KEYS[1]) == ARGV[1] then" + // 如果 KEYS[1] 對應的 Value 值等于 ARGV[1]
" return redis.call("del",KEYS[1])" + // 刪除 KEYS[1]
" else" + // 否則
" return 0" + // 返回 0
" end;";
}
復制代碼
測試代碼:
/**
* 測試手動加鎖解鎖
* <p>
* Created by jie.li on 2021/1/7 2:54 下午
*/
@Service
public class RedisLockTestService {
@Resource
private RedisLock1 redisLock1;
private int i = 50;
/**
* 測試手動實現 redis 分布式鎖
*
* @return int
*/
public int biz() {
String lockName = "redis:lock:1";
String lockValue = UUID.randomUUID().toString();
try {
boolean b = redisLock1.tryLock(lockName, lockValue);
if (b) {
if (i > 0) {
i--;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock1.unLock(lockName, lockValue);
}
return i;
}
}
復制代碼
@Test
public void testBiz() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(200);
for (int i = 0; i < 200; i++) {
new Thread(() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
int i1 = redisLockTestService.biz();
System.out.println(Thread.currentThread().getName() + " -> " + i1);
}, "Thread" + i).start();
countDownLatch.countDown();
}
TimeUnit.SECONDS.sleep(5);
}
復制代碼
六、Redisson 實現分布式鎖
1. Redisson 實現鎖簡介
Redisson 實現的分布式鎖相對于我們自己實現的鎖更加完善,主要有以下兩點:
1、可重入
2、鎖重試
3、鎖自動延期(看門狗機制)
Redisson 鎖的依賴圖:
Redisson 實現了很多種類型的鎖,所有的鎖都實現了 JUC 中的 Lock 接口,并且做了擴展( RLock ), 所以使用方法和使用 ReentrantLock 差不多。這里我們只針對 RedissonLock 進行講解。
2. Redisson 源碼解析
嘗試加鎖
// waitTime 等待獲取鎖的時間
// leaseTime 鎖的有效期
// unit 使用的時間單位
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1、嘗試加鎖,如果當前有鎖,返回鎖的剩余時間ttl,否則返回空
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
// 2、加鎖成功,返回true
if (ttl == null) {
return true;
}
// 剩余的等待時間 waitTime
time -= System.currentTimeMillis() - current;
// 剩余等待時間已過
if (time <= 0) {
// 獲取鎖失敗
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 3、訂閱鎖釋放事件。利用semaphore(信號量),訂閱(Redis 發(fā)布訂閱)鎖的釋放事件,
// 鎖釋放后立即通知等待的線程競爭獲取鎖。
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 4、線程阻塞
// - 返回 false: 阻塞時間已經超過了剩余等待時間(waitTime),取消訂閱事件,加鎖失敗
// - 返回 ture: 繼續(xù)嘗試加鎖
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
// 剩余等待時間已過,加鎖失敗
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 5、繼續(xù)以同樣的方式獲加鎖,如果過了最大的等待加鎖時間,則加鎖失敗,返回false
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// 6、通過信號量(共享鎖)阻塞,等待釋放鎖消息
// 鎖剩余時間小于剩余的waitTime時間
if (ttl >= 0 && ttl < time) {
// 非阻塞的獲取結果,獲得信號量,在給定的時間內從信號量獲取一個許可。
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 7、剩余的waitTime
time -= System.currentTimeMillis() - currentTime;
// 加鎖最大等待時間已過,加鎖失敗,返回false
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 取消訂閱事件
unsubscribe(subscribeFuture, threadId);
}
}
復制代碼
tryLock 方法參數說明:
嘗試加鎖方法 tryLock ,兩個重要的入參 waitTime、leaseTime:
- waitTime: 嘗試加鎖的最大時間,如果在這個時間內一直沒有加鎖成功,則返回 false。
- leaseTime: 鎖的有效期,如果客戶端(進程)在這個時間內沒有釋放鎖,則 Redis 主動釋放,當然 Redisson 看門狗的機制會將這個時間延期,后面會說到。
流程總結:
- 嘗試加鎖 tryAcquire ,如果加鎖成功則返回 null, 如果鎖被占用,則返回鎖的剩余時間 ttl。
- 如果加鎖成功返回 true,否在判斷 waitTime 是否過期,過期則加鎖失敗返回 false。
- 基于信號量,通過 Redis 的發(fā)布訂閱,訂閱鎖的釋放事件,一旦鎖釋放會立即通知等待的線程去競爭鎖。
- 線程阻塞剩余 waitTime 時間,來等待鎖釋放的通知,如果阻塞時間超過了剩余 waitTime 時間,則取消任務,取消任務成功再取消訂閱信息,加鎖失敗返回 false;否則在剩余 waitTime 時間內等到了鎖釋放通知,則進入循環(huán)加鎖階段。
- 循環(huán)中繼續(xù)以同樣的方式加鎖,如果在剩余 waitTime 內加鎖成功返回 true,否在加鎖失敗返回 false。
- 如果在剩余 waitTime 時間內,鎖還是被其他的客戶端(進程)持有,阻塞指定時間(持有鎖的剩余過期時間和剩余 waitTime 時間)等待鎖的釋放消息。
- 具體實現:利用信號量(semaphore)阻塞當前線程獲取許可,如果有可用許可則繼續(xù)嘗試加鎖,如果沒有可用許可則阻塞給定的時間,直至其他線程釋放鎖,調用 release() 方法增加許可,或者其它某些線程中斷當前線程,或者已超出指定的等待時間。
- 如果剩余 waitTime 過期,加鎖失敗返回 false。
加鎖
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 1、如果 Redis 中不存在這個 key
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 2、設置 key 和 field, 并將 value 的值設置為 1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 3、設置 key 的過期時間
"return nil; " + // 4、返回 null
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 5、如果 Redis 中存在對應的 key 和 field
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 6、則將對應的 key 和 field 對應的 value 自增 1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 7、設置 key 的過期時間
"return nil; " + // 8、返回 null
"end; " +
"return redis.call('pttl', KEYS[1]);", // 9、返回剩余生存時間, 單位毫秒
// 以下這三個參數分別對應 Lua 腳本中的 KEYS[1], ARGV[1], ARGV[2]
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
復制代碼
Redisson 中實際加鎖的代碼,流程總結:
- 如果 Redis 中不存在 key。
- 則使用 hset 這個命令設置 key 和 field,并將 hash 的 value 設置為 1,這里使用 Redis 中的 hash 數據結構, value 的值用于支持可重入鎖,記錄加鎖次數。
- 設置 key 的過期時間。
- 加鎖成功返回 null。
- 如果 Redis 中存在對應的 key 和 field。
- 將對應的 key 和 field 對應的 value 值自增 1,記錄重入鎖的次數。
- 設置 key 的過期時間。
- 加鎖成功返回 null。
- 加鎖失敗,返回 key 的剩余生存時間(單位毫秒)。
鎖自動續(xù)期(Watch Dog 機制)
在不指定鎖超時時間(leaseTime)的情況下,Redisson 分布式鎖會自動給鎖續(xù)期,也就是所謂的看門狗機制。
鎖自動續(xù)期代碼解析:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 如果指定了鎖的有效期,則直接返回加鎖結果,不會走后面的 Watch Dog 機制
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 實際加鎖
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 加鎖執(zhí)行完成后
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 加鎖執(zhí)行有異常,直接返回
if (e != null) {
return;
}
// lock acquired
// 獲取到鎖
if (ttlRemaining == null) {
// 自動續(xù)期(watch dog)
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
復制代碼
private void scheduleExpirationRenewal(long threadId) {
// ExpirationEntry 維護鎖的線程重入計數器和續(xù)期任務
ExpirationEntry entry = new ExpirationEntry();
// 將 entry 放入 ConcurrentHashMap
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 鎖重入,當前線程計數器+1
oldEntry.addThreadId(threadId);
} else {
// 第一次,當前線程計數器+1
entry.addThreadId(threadId);
// 第一次觸發(fā)鎖續(xù)期
renewExpiration();
}
}
復制代碼
private void renewExpiration() {
// 在 ConcurrentHashMap 中拿到 ExpirationEntry 對象
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 新建一個定時任務,自動續(xù)期的主要實現
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 獲取第一個線程Id
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 異步續(xù)期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
// 續(xù)期異常,打印錯誤日志,并且清除Map,不再執(zhí)行續(xù)期。
log.error("Can't update lock " + getName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 續(xù)期成功后,遞歸調用,繼續(xù)調用達到持續(xù)續(xù)期目的
if (res) {
// reschedule itself
renewExpiration();
}
});
}
// 延遲執(zhí)行時間為 internalLockLeaseTime / 3,internalLockLeaseTime 默認時間是 30s,也可以自定義指定。
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
復制代碼
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果存在指定的 key 和 filed
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 續(xù)期
"return 1; " + // 返回續(xù)期成功
"end; " +
"return 0;", // 返回續(xù)期失敗
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
復制代碼
// 看門狗超時時間默為 30s, 自定義的話可以修改 lockWatchdogTimeout 配置
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
private long lockWatchdogTimeout = 30 * 1000;
復制代碼
鎖自動續(xù)期總結:
- 在沒有指定鎖超時時間(leaseTime)的情況下,加鎖成功后就會執(zhí)行自動續(xù)期。
- 如果當前線程持有的是重入鎖,則對鎖重入次數+1,如果是首次加鎖,除了鎖次數+1 還需要執(zhí)行鎖續(xù)期。這里需要清楚是只有首次加鎖才會續(xù)期,重入鎖不會執(zhí)行續(xù)期操作。將鎖對應的線程 Id 及重入次數放入對象 ExpirationEntry 中, ExpirationEntry 對像使用 LinkedHashMap 維護了鎖的線程 Id 和重入計數器。然后將 ExpirationEntry 對象放 EXPIRATION_RENEWAL_MAP (ConcurrentHashMap), EXPIRATION_RENEWAL_MAP 中存放著所有需要續(xù)期的鎖。
- 新建一個延遲任務,10s(默認)之后執(zhí)行,在 EXPIRATION_RENEWAL_MAP 中取出 ExpirationEntry 對象,拿到第一個線程 Id,然后執(zhí)行 Lua 腳本,檢查線程 Id 對應的 key 和 filed 是否存在(鎖),如果存在則重置鎖的超時時間為 30s(默認),如果不存在則說明已經解鎖了不需要續(xù)期。
- 續(xù)期成功后,繼續(xù)遞歸調用步驟 3,保證持續(xù)鎖續(xù)期,續(xù)期失敗則說明鎖已經不存在了,停止續(xù)期。
當服務宕機時,看門狗的線程也就不存在了,此時也就不會對鎖進行自動續(xù)期,到了 30s 鎖就會自動過期,其他線程就可以獲取鎖了,不會造成死鎖。
解鎖
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 如果鎖不存在
"return nil;" + // 解鎖失敗
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 否則將鎖的重入計數器-1
"if (counter > 0) then " + // 如果重入計數器>0
"redis.call('pexpire', KEYS[1], ARGV[2]); " + // 將鎖續(xù)期 30s
"return 0; " + // 返回成功
"else " +
"redis.call('del', KEYS[1]); " + // 否則刪除鎖
"redis.call('publish', KEYS[2], ARGV[1]); " + // 發(fā)布解鎖消息
"return 1; " + // 返回解鎖成功
"end; " +
"return nil;", // 解鎖失敗
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
復制代碼
流程總結:
- 判斷鎖是否存在,如果鎖不存在直接返回 null。
- 重入計數器減一(因為支持重入鎖的緣故,這里不能直接將鎖刪除)。
- 如果重入計數器還是大于零,說明線程還是持有鎖的,將鎖續(xù)期 30s,返回成功。
- 否則刪除鎖,并且發(fā)送刪除鎖的消息(channelName:redisson_lock__channel:{鎖 key 值}),以通知阻塞隊列中的線程嘗試加鎖。
- 返回解鎖成功。
總結
- RedissonLock 實現了鎖等待(waitTime),鎖重入,鎖自動續(xù)期等復雜功能。
- RedissonLock 實現的分布式鎖使用的是 Hash 數據結構,其中 Hash key 是我們指定鎖的 key 值, filed 是 UUID:threaId,value 是重入鎖次數。其中 UUID 是 Redisson 客戶端連接管理器實例初始化生成的 UUID。使用 Hash 數據結構,是實現鎖重入的關鍵。
- RedissonLock 加鎖,解鎖,看門狗都是用了 Lua 腳本,保證命令執(zhí)行的原子性。
- RedissonLock 實現鎖等待時間(waitTime)不是使用的 while(true) 手段,而是使用的 Redis 發(fā)布訂閱,semaphore(信號量)實現的,解決了無效鎖申請造成的系統(tǒng)資源浪費問題。
- 具體實現是使用 semaphore 進行帶期限的阻塞線程,當鎖釋放時會發(fā)布鎖釋放的消息,收到解鎖消息后調用 release() 方法,此時被 semaphore 阻塞的等待隊列中的一個線程就可以嘗試獲取鎖了,如果在指定期限內未獲得鎖,則獲取鎖失敗。
- 只有未設置鎖超時時間(leaseTime),才能使用 Redisson 看門狗機制。
七、Redis 高可用架構下的分布式鎖問題
上面講 Redisson 實現的分布式鎖,在單機模式下已經趨近完美了。
但是單點的話故障的話,那就芭比 Q 了,所以我們第一點想到的是部署高可用集群。
目前 Redis 高可用架構主要有主從模式,哨兵模式,集群模式,在這三種模式下使用 Redis 分布式鎖存在一個弊端,可能會導致多個客戶端同時加鎖成功。
客戶端 A 加鎖成功,由于 Reids 主從同步數據是異步執(zhí)行的,LockA 鎖還沒來的及同步到 Slave,此時 Master 節(jié)點宕機了。
Slave 節(jié)點提升為 Master,客戶端 B 來加鎖,發(fā)現沒有其他客戶端占用鎖,LockB 加鎖成功。
這時就導致了兩個客戶端同時獲取了鎖。
所以,如果使用 Redis 分布式鎖,應盡量避免主從、哨兵或集群模式。
八、紅鎖(Redlock)
1. Redlock 概念
RedLock 是 Redis 作者提出的一個算法。
Redlock 官網介紹
在該算法的分布式版本中,我們假設有 N 個 Redis masters。這些節(jié)點是完全獨立的,所以我們不使用復制或任何其他隱式協(xié)調系統(tǒng)。我們已經描述了如何在單個實例中安全地獲取和釋放鎖。我們想當然地認為,算法將使用這種方法在單個實例中獲取和釋放鎖。在我們的示例中,我們設置了 N=5,這是一個合理的值,因此我們需要在不同的計算機或虛擬機上運行 5 個 Redis 主機,以確保它們以基本獨立的方式失敗。
為了獲取鎖,客戶端執(zhí)行以下操作:
- 以毫秒為單位獲取當前時間。
- 使用相同的 key 和隨機值在所有 Redis 實例中順序獲取鎖。當在每個實例中獲取鎖時,客戶端使用一個超時,該超時與鎖自動釋放的總時間相比很小,以便獲取它。例如,如果自動釋放時間為 10 秒,則超時時間可能在 5-50 毫秒范圍內。這可以防止客戶端在嘗試與已關閉的 Redis 節(jié)點通話時長時間處于阻塞狀態(tài):如果某個實例不可用,我們應該盡快嘗試與下一個實例通話。
- 客戶端通過從當前時間中減去在步驟 1 中獲得的時間戳來計算獲取鎖所用的時間。當且僅當客戶端能夠在大多數實例(至少 3 個)中獲取鎖,并且獲取鎖所用的總時間小于鎖有效時間,則認為已獲取鎖。
- 如果獲得了鎖,其有效時間將被視為初始有效時間減去經過的時間,如步驟 3 中計算的。
- 如果客戶端由于某種原因(無法鎖定 N/2+1 實例或有效期為負)未能獲取鎖,它將嘗試解鎖所有實例(即使是它認為無法鎖定的實例)。
Redis 作者對紅鎖的介紹非常詳細,點擊這里查看。
簡單總結下:
假設有五個 Redis 實例,這些實例之間是完全獨立的,并且部署在不同的計算機上,客戶端嘗試在這幾個實例中獲取鎖。
如果客戶端能夠在大多數實例(N/2+1,至少三個)中獲取鎖,并且獲取鎖所有的總時間小于鎖有效時間,則認為獲取鎖成功。
如果加鎖成功,鎖的有效期=初始有效時間-獲取鎖的總時間,假如鎖有效期為 10s,獲取鎖共花了 2s,那么鎖的有效期還剩 8s。
無論客戶端獲取鎖成功還是失敗,都需要解鎖所有 Redis 實例,以免發(fā)生死鎖。
使用多個完全獨立的 Redis 實例,解決了 Redis 主從異步復制造成的鎖丟失問題,同時保障了高可用。
至少 N/2+1 個實例加鎖成功,保證鎖的互斥性,防止多個客戶端同時獲取到鎖。
2. Redlock 存在問題
表面上看 RedLock 解決 Redis 分布式鎖的痛點,但是真的就萬無一失了嗎?
有人就提出了質疑,Martin Kleppmann: How to do distributed locking
Martin Kleppmann 在效率和正確性方面質疑了紅鎖,他認為如果是為了效率使用分布式鎖,沒有必要承擔 Redlock 的成本和復雜性,最好還是使用一個 Reids 實例或者主從模式。正確性方面,他認為 Redlock 也絕對保證不了鎖的正確性,文章在網絡延遲,過程暫停(GC),時鐘漂移方面給出了論證。
Redis 作者(Salvatore)也反駁了該質疑:Is Redlock safe?
建議大家讀下上面兩篇文章。
我個人認為使用 Redlock 要慎重,首先,它的效率比較差,在一些 RT 要求比較高的接口中增加了耗時風險;其次,無法保證絕對的正確性,可能會出現多個客戶端同時獲取鎖的風險(Martin Kleppmann 在他的文章里有舉證);再次,成本和復雜性較高。
3. Redisson紅鎖使用
使用示例:
// 在不同Redis實例上獲取 RLock
RLock rLock1 = redisson1.getLock(key);
RLock rLock2 = redisson2.getLock(key);
RLock rLock3 = redisson3.getLock(key);
// 初始化紅鎖
RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1, rLock2, rLock3);
// 加鎖
redissonRedLock.lock();
// 業(yè)務邏輯
// 解鎖
redissonRedLock.unlock();
復制代碼
Redisson 在新版本中已經棄用了 RedissonRedLock,不建議使用。






