Redis 分布式锁的再研究
在分布式系统中,针对共享资源的互斥访问 (mutually exclusive access) 一直是很多业务系统需要解决的问题,而分布式锁常常作为一种通用的解决方案被提出来。互斥能力一般是由第三方中间件来提供,比如:Redis 、ZooKeeper 和 Etcd 等;当然 MySQL 也是可以的,我们可以新建一个专门的锁表 (tbl_lock),数据插入成功意味着抢占到了锁,而数据删除成功则意味着释放了锁,在数据没有删除的情况下,另一客户端试图抢占该锁 (即插入一条记录) 的话则会报主键重复的错误。
本文主要分为两个章节,第一章节重点分享 Redis 分布式锁在落地过程中值得大家关注的一些知识点,第二章节将解读 Redisson 的部分源码。
1 Redis 分布式锁的进阶之旅
1.1 key 的设计
key 要能够全局地标识共享资源的唯一性,一般多选择 bizKey 来充当。
1.2 锁的粒度
锁的粒度尽量精细一些。比如在一个长流程的业务逻辑中,只有扣减库存才涉及对共享资源的互斥访问,那就应该只针对扣减库存逻辑进行锁的抢占与释放。
1.3 锁的释放
锁一旦抢占成功,待业务逻辑执行完毕,必须要显式地释放掉,建议将释放锁的逻辑放在finally
代码块中;此外,锁只能由持有该锁的对象来释放,绝不允许出现“张三释放了李四持有的锁”这一现象。
大家觉得下面这段伪代码在锁的释放上有啥问题?
String rst = jedis.set(bizKey, randomIntValue, SetParams.setParams().nx().px(30));
if (!OK.equals(rst)) {
return;
}
try {
doBizAction();
} finally {
jedis.del(bizKey);
}
想象一下,如果 Client 1 抢占到了锁之后发生了 Full GC,整个 JVM 进程卡在那里不动了,业务逻辑当然是不会执行的,当 Full GC 执行完毕后锁已经过期而被 Redis 自动释放掉了;然后 Client 2 将会抢占到该锁,此时 Client 1 执行完业务逻辑之后会把 Client 2 持有的锁给释放掉。如下图所示。
针对这一问题,我们可以在抢占锁的时候将 requestId 作为 value,改进版伪代码如下所示。
String rst = jedis.set(bizKey, requestId, SetParams.setParams().nx().px(30));
if (!OK.equals(rst)) {
return;
}
try {
doBizAction();
} finally {
if(requestId.equals(jedis.get(bizKey))) {
jedis.del(bizKey);
}
}
很遗憾,这么改还是有问题,主要体现在 get()、equals() 和 del() 这三个操作不满足原子性。如果在执行 get() 操作之后发生了 Full GC,当 JVM 进程恢复后 Client 1 所持有的锁已经过期而被释放,此时 Client 2 成功抢占到了该锁,可此时 Client 1 所在线程会继续执行 equals() 和 del() 操作,也就是说 Client 1 还是释放掉了 Client 2 持有的锁。如下图所示。
显然,此时我们需要使用 lua 脚本来确保 get()、equals() 和 del() 这三个操作满足原子性即可。lua 脚本如下所示。
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
1.4 死锁
锁一定要有过期时间,万一持有锁的对象无法释放掉锁,那么该锁后续也就无法再次被持有了。
1.5 锁过期时间的设定
如果锁过期时间设定过短,那么在业务还未执行完毕的情况下,锁可能被别人抢占了;而过期时间设定过长,又会严重影响业务的吞吐量。比较好的方案是开启一个线程来不断续期,Redisson 就是这么干的,下一章节会详细介绍。
1.6 抢占锁失败之后的处理方式
抢占锁失败之后,一般有两种处理方式。1) 直接抛出异常从而让用户重试,存在用户体验不佳的问题;2) 通过自旋机制不断地重新抢占锁,该方案在高并发场景下并不可取,因为会导致 CPU 资源的浪费。笔者这里蹭个热点,来看看 ChatGPT 是如何回答该问题的。
1.7 异步复制
为了构建高可用的 Redis 服务,往往很少选择单节点或者单纯的 master-slave 部署架构,一般会选择哨兵 (Sentinel) 或 集群 (Cluster) 部署架构。在哨兵和集群架构中,一个 master 节点总会有一个或多个 slave 节点,毕竟为了高可用,数据冗余是必须的,而在 master 节点通过异步复制 (asynchronous replication) 机制将数据传递到若干 slave 节点过程中,由于没有 ZooKeeper 那种强一致性共识协议,这可能造成数据不一致的现象,也就是说分布式锁的互斥性在 Redis 中是无法做到百分之百可靠的!
如上图所示。Client 1 成功抢占到了锁;紧接着 master 节点挂点了,从而导致数据无法传递到 slave 节点;然后 salve 节点晋升为新的 master 节点;最终,Client 2 将会抢占到该锁。
为了解决这一问题,Redis 的设计者 antirez 提出了大名鼎鼎的 RedLock 方案。RedLock 方案的前提是需要 N 个 Redis master 节点,这些 master 节点之间是完全相互独立的,不存在任何异步复制操作!RedLock 的核心思想:依次向 N 个 master 节点发起抢占锁的请求,如果在至少N/2+1
个 master 节点中成功地抢占了锁,那么就认为最终锁抢占成功。
Martin Kleppmann 大神在其 How to do distributed locking 一文中,评价 RedLock 是一种不伦不类、完全建立在三种假设基础上的分布式锁方案。这三种假设如下。
- 进程暂停 (processe pausing) ,假设进程暂停时间远远小于锁的过期时间。
- 网络时延 (network delaying) ,假设网络时延远远小于锁的过期时间。
- 时钟漂移 (clock drift) ,假设锁在所有 Redis 节点中的存活时间与其过期时间是相匹配的。
Redis 是由 C 语言开发而来的,自然不存在进程暂停之说,其实进程暂停的对象指的是 Redis 客户端,比如一个 Spring Boot 应用。Martin Kleppmann 给出了一个由 Redis 客户端进程暂停造成数据不一致的典型场景,如下图所示。
针对上述问题,Martin Kleppmann 给出了一个名为 fencing token 的解决方案,如下图所示。
2 Redisson 部分源码解读
Redisson 应该是当前市面上最强大的一款 Redis 分布式锁解决方案了,使用起来比较省心,很多问题都替你考虑到了。
Config config = new Config();
config.setLockWatchdogTimeout(30000)
.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("my-lock");
boolean rst = lock.tryLock();
try {
if (rst) {
doBizAction();
}
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
redissonClient.shutdown();
RedissonSpinLock
和RedissonLock
是 Redisson 中两个极为常用的非公平、可重入锁。RedissonSpinLock 与 RedissonLock 最大的区别在于它没有将“发布-订阅”机制整合到锁的抢占与释放流程中,这应该是有意为之。因为在大规模 Redis Cluster 中,“发布-订阅”机制会产生广播风暴。具体地,Redis 的“发布-订阅”机制是按照 channel (通道) 来进行发布与订阅的,然后在 Redis Cluster 模式下,channel 不会参与基于 hash 值的 slot 分发,也就是说发布的消息将以广播的形式在集群中传播开来。那么问题是显而易见的,假设一个 Redis Cluster 中有 100 个分片主节点;用户在节点 1 发布消息,该节点就会把消息广播给其他 99 个节点;若在这 99 个节点中,只有零星几个节点订阅了该 channel,这势必会造成网络、CPU 等资源的浪费。幸运的是,Redis 7.0 终于支持Sharded Pub/Sub
特性了。
RedissonSpinLock 与 RedissonLock 均使用 Redis Hash 数据结构来承载锁的抢占与释放动作,lockName 用于标识一个 Hash 实例,Hash 中的 key 由UUID.toString():Thread.currentThread().getId()
拼接而成,value 值一般为 1 (如果发生了锁的重入,该值将会递增) 。
2.1 RedissonSpinLock
2.1.1 抢占锁
public class RedissonSpinLock extends RedissonBaseLock {
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// waitTime 对应抢占锁的超时时间
// 如果在该时间内依然未成功抢占锁,就直接返回 false
long time = unit.toMillis(waitTime);
// 标记抢占锁开始时间
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 试图抢占锁
// 抢占成功,返回 null;抢占失败,则返回锁剩余存活时间 (ttl)
Long ttl = tryAcquire(leaseTime, unit, threadId);
// ttl 值为 null,说明锁获取到了,直接返回 true
if (ttl == null) {
return true;
}
// 每一次抢占锁,都要标记开始与结束时间,方便判断抢占锁的耗时是否大于 waitTime
time -= System.currentTimeMillis() - current;
// time <= 0 意味着抢占锁的耗时大于 waitTime,直接返回 false,结束锁抢占
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 在 Spring Retry 中也有 BackOffPolicy,它是对两次重试、两次抢占锁之间的时间间隔的抽象
// ConstantBackOff 对应固定的间隔时间,而 ExponentialBackOffPolicy 对应指数级间隔时间
LockOptions.BackOffPolicy backOffPolicy = backOff.create();
// 通过 while 循环来自旋 (spin)
while (true) {
// 标记下一次抢占锁开始时间
current = System.currentTimeMillis();
Thread.sleep(backOffPolicy.getNextSleepPeriod());
// 再次抢占锁
ttl = tryAcquire(leaseTime, unit, threadId);
// ttl 值为 null,说明锁获取到了,直接返回 true
if (ttl == null) {
return true;
}
// 执行至此,说明抢占锁失败了。
// 计算本次抢占锁的耗时
time -= System.currentTimeMillis() - current;
// time <= 0 意味着抢占锁的耗时大于 waitTime,直接返回 false,结束锁抢占
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// leaseTime 即锁过期时间(存活时间)
// 如果 leaseTime > 0,就不涉及对 scheduleExpirationRenewal() 方法的调用
// 直白点说,就是不再进行自动续期了,这一点要切记。
if (leaseTime > 0) {
// tryLockInnerAsync 内部是通过 lua 脚本来实现锁的抢占与重入
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// tryLockInnerAsync 内部是通过 lua 脚本来实现锁的抢占与重入
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 重度使用 CompletableFuture,不废话了
ttlRemainingFuture.thenAccept(ttlRemaining -> {
// ttlRemaining == null,说明抢占锁成功了
if (ttlRemaining == null) {
// 开启锁自动续期功能
// 啥时候才能开启锁自动续期功能呢?
// 1)leaseTime <= 0,一般是 -1 (即开发人员没有显示地设置锁的过期时间)
// 2) 抢占锁成功了
scheduleExpirationRenewal(threadId);
}
});
// 返回 null 或者锁的存活时间(通过 ttl key 指令拿到的)
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));
}
}
抢占锁的 lua 脚本的注释如下。
-- 锁不存在,抢占
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
-- 成功抢占锁,返回 null
return nil;
end;
-- 锁已存在且由自己持有,重入
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
-- 重入锁依然返回 null
return nil;
end;
-- 锁已存在但由他人持有,获取锁剩余存活时间
return redis.call('pttl', KEYS[1]);
2.1.2 释放锁
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
// unlockInnerAsync() 逻辑在 RedissonSpinLock 内
// 其主要逻辑就是使用 lua 脚本来释放锁
RFuture<Boolean> future = unlockInnerAsync(threadId);
// future 代表释放锁的异步结果
CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 不管释放锁是否成功,先取消锁的自动续期
cancelExpirationRenewal(threadId);
// e 不为 null,说明异常了
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
}
public class RedissonSpinLock extends RedissonBaseLock {
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"return 1; " +
"end; " +
"return nil;",
Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));
}
}
释放锁的 lua 脚本的注释如下。
-- 锁不存在,啥也不做,直接返回 null
if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then
return nil;
end ;
-- 执行至此,说明锁已经被抢占了
local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1);
-- 如果 counter 此时依然大于 0,这说明发生锁重入了
if (counter > 0) then
-- 不能直接删除,而是续期
-- 从这一点,也能看出来 抢占锁 与 释放锁 应该是成对儿的一套动作
redis.call('pexpire', KEYS[1], ARGV[1]);
-- 返回 false
return 0;
else
-- 真正释放锁
redis.call('del', KEYS[1]);
-- 返回 true
return 1;
end;
return nil;
2.2 RedissonLock
相较于 RedissonSpinLock,RedissonLock 则要复杂很多。在正餐开始前,要先介绍一下 Redisson 类库中的AsyncSemaphore
,以帮助大家理解后续内容。还记得Semaphore
吗?它位于 JDK 类库 j.u.c 包下,主要用于实现 JVM 进程级的多线程限流,比如:限制某一业务只允许最多 10 个线程并发访问;同样,AsyncSemaphore 也是干这个事的,其 Async 前缀意味着其acquire()
方法是非阻塞的并可以返回一个CompletableFuture<Void>
实例。
public class AsyncSemaphoreApp {
public static void main(String[] args) throws InterruptedException {
AsyncSemaphore asyncSemaphore = new AsyncSemaphore(10);
CyclicBarrier cyclicBarrier = new CyclicBarrier(20);
CountDownLatch countDownLatch = new CountDownLatch(20);
ThreadPoolExecutor threadPoolExecutor = ···;
for (int i = 0; i < 20; i++) {
threadPoolExecutor.submit(
() -> {
try {
cyclicBarrier.await();
asyncSemaphore.acquire().thenAccept(future -> {
// 只有 10 个许可,那么最多允许 10 个线程并发执行 doSth() 逻辑
doSth();
asyncSemaphore.release();
});
} catch (InterruptedException | BrokenBarrierException e) {
// Ignore
} finally {
countDownLatch.countDown();
}
}
);
}
countDownLatch.await();
threadPoolExecutor.shutdown();
}
}
2.2.1 抢占锁
在 RedissonLock 中,关于抢占锁的重载方法有好几个,这里选取逻辑最复杂的tryLock(waitTime, leaseTime, timeUnit)
方法来分析。该方法整体外围逻辑如下所示。
public class RedissonLock extends RedissonBaseLock {
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// waitTime 对应抢占锁的超时时间
// 如果在该时间内依然未成功抢占锁,就直接返回 false
long time = unit.toMillis(waitTime);
// 标记抢占锁开始时间
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 试图抢占锁 | lua 脚本
// 抢占成功,返回 null;抢占失败,则返回锁剩余存活时间 (ttl)
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// ttl 值为 null,说明锁获取到了,直接返回 true
if (ttl == null) {
return true;
}
// 每一次抢占锁,都要标记开始与结束时间,方便判断抢占锁的耗时是否大于 waitTime
time -= System.currentTimeMillis() - current;
// time <= 0 意味着抢占锁的耗时大于或等于 waitTime,直接返回 false,结束锁抢占
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 再次标记抢占锁开始时间
current = System.currentTimeMillis();
// 订阅,channel 名称格式为:redisson_lock__channel:{lockName},lockName 对应 Redis Key
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
// CompletableFuture#get() 可以主动完成异步计算
// 通俗地说,这里是主动完成订阅任务
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// 如果订阅超时,则立即退出,抢占锁动作结束,返回 false
// 逻辑挺严谨的
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + time + "ms. " +
"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// time <= 0 意味着抢占锁的耗时大于或等于 waitTime,直接返回 false,结束锁抢占
// 准确地说,尽管订阅成功,但此时 time <= 0,很抱歉,抢占锁还是失败,立即返回 false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// while 循环,有点自旋的味儿了,但跟 RedissonSpinLock 还是不一样的
// 因为这里使用了发布订阅机制
while (true) {
// 再次标记抢占锁开始时间
long currentTime = System.currentTimeMillis();
// 再次试图抢占锁 | lua 脚本
// 抢占成功,返回 null;抢占失败,则返回锁剩余存活时间 (ttl)
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// time <= 0 意味着抢占锁的耗时大于或等于 waitTime,直接返回 false,结束锁的抢占
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 锁依然没有抢占到,此时需要等待持有锁的对象来释放了
// 一旦持有锁的对象释放掉了锁,会向 redisson_lock__channel:{lockName} 发布锁释放的消息
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 如果锁的剩余存活时间大于 0 且小于当前抢占锁消耗的时间,则阻塞 ttl 毫秒
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 如果锁的剩余存活时间大于当前抢占锁消耗的时间,则阻塞 time 毫秒
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// time <= 0 意味着抢占锁的耗时大于或等于 waitTime,直接返回 false,结束锁的抢占
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 在进入当前 try 代码块之前就已经进行了订阅操作
// 因此,无论锁抢占成功与否,只要是在当前 try 代码块 return 出去了
// 就一定需要取消订阅操作
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
}
}
在上述 tryLock(waitTime, leaseTime, timeUnit) 方法中,必须要对一些隐藏的细节进行剖析。
- A. tryAcquire(waitTime, leaseTime, timeUnit, threadId)
tryAcquire(waitTime, leaseTime, timeUnit, threadId) 方法内部又是委派 tryAcquireAsync(waitTime, leaseTime, timeUnit, threadId) 方法来干活,如下。
public class RedissonLock extends RedissonBaseLock {
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// tryLockInnerAsync() 抢占锁
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
// 如果 leaseTime > 0,这说明开发人员显式传入了 leaseTime
// leaseTime 默认等于 -1
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// leaseTime = -1,则重置 leaseTime,使其等于 internalLockLeaseTime(默认 30s)
// 其实 internalLockLeaseTime 指的就是 lockWatchdogTimeout(看门狗超时时间)
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// ttlRemainingFuture.get() 可能返回 null,也可能返回锁的剩余存活时间
// 如果返回了 null,这说明成功抢占到了锁
// lock acquired
if (ttlRemaining == null) {
// FBI Warning!重点来了!!!
if (leaseTime > 0) {
// 如果 leaseTime > 0,这说明开发人员显式传入了 leaseTime
// 那么此时就需要将基于 leaseTime 来重置 internalLockLeaseTime
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 如果 leaseTime = -1,这就需要主动开启锁的自动续期机制了,切记!!!、
// 的确应该如此,既然开发人员显式设定了 leaseTime,那就是不想要自动为锁续期啊
// 相当于强制声明锁的存活时间为 leaseTime
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
}
首先,tryLockInnerAsync() 方法是抢占锁的核心,主要是一段 lua 脚本。
-- 如果锁未被抢占或锁已由自己持有
-- 那么视为成功抢占了锁
if ((redis.call('exists', KEYS[1]) == 0) or
(redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
-- 返回 null
return nil;
end;
-- 抢占锁失败,直接返回锁的剩余存活时间
return redis.call('pttl', KEYS[1]);
然后,scheduleExpirationRenewal() 方法用于激活锁自动续期功能。
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
protected void scheduleExpirationRenewal(long threadId) {
// ExpirationEntry 对应锁自动续期任务
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 真正的续期逻辑就在这里
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// renewExpirationAsync() 内主要是一段 lua 脚本,核心是 pexpire 指令
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
// 续期异常,移除续期任务
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 续期成功,递归调用 renewExpiration()
if (res) {
// reschedule itself
renewExpiration();
} else {
// 续期失败,则取消续期
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
scheduleExpirationRenewal() 方法主要交待了两件事,分别是“如何续期”以及“续期间隔时间”。“如何续期”的内容在 renewExpirationAsync() 方法内,其核心内容是下面这段 lua 脚本:
-- 如果锁已经由自己持有,那才可以续期
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 续期就是通过 pexpire 指令实现的
-- pexpire 的时间单位是毫秒,而 expire 的时间单位是秒
redis.call('pexpire', KEYS[1], ARGV[1]);
-- 已续期,返回 true
return 1;
end;
-- 未续期,返回 false
return 0;
而关于“续期间隔时间”,则取决于internalLockLeaseTime/3
,internalLockLeaseTime
指的就是lockWatchdogTimeout
,默认值为 30 秒。
- B. subscribe(threadId)
subscribe(threadId) 方法最终是委派给PublishSubscribe
处理的额,在 PublishSubscribe 中的“发布-订阅”逻辑中,使用到了 AsyncSemaphore,但笔者一直没理解为什么要针对“发布-订阅”动作限流。于是就去问了作者,但他讲的都是废话,似乎有啥商业机密一样 (Redisson 还真的有商业版本)。
笔者猜测可能是频繁的“发布-订阅”操作会增加 Redis 服务器的负担,尤其是在 Redis Cluster 模式下。先不纠结这个限流的问题了,继续往下走。
一起看看订阅的核心逻辑吧。
abstract class PublishSubscribe<E extends PubSubEntry<E>> {
public CompletableFuture<E> subscribe(String entryName, String channelName) {
// 获取 AsyncSemaphore 实例,其有且只有只有一个许可(信号量)
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture<>();
// semaphore.acquire() 返回一个 CompletableFuture 实例
semaphore.acquire().thenAccept(c -> {
if (newPromise.isDone()) {
semaphore.release();
return;
}
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
return;
}
newPromise.complete(r);
});
return;
}
// RedissonLockEntry 对应订阅任务,构建一个 RedissonLockEntry 实例
E value = createEntry(newPromise);
value.acquire();
// 将该 RedissonLockEntry 实例放入 ConcurrentMap<String, E> 类型的成员变量 entries 中去
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
return;
}
newPromise.complete(r);
});
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
// 发起订阅操作
CompletableFuture<PubSubConnectionEntry> s = service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, listener);
newPromise.whenComplete((r, e) -> {
if (e != null) {
s.completeExceptionally(e);
}
});
s.whenComplete((r, e) -> {
// 执行至此,订阅已经完成了
// 如果 e 不为 null,说明订阅失败了
if (e != null) {
// 从 ConcurrentMap<String, E> 类型的成员变量 entries 中移除该 RedissonLockEntry 实例
entries.remove(entryName);
// RedissonLockEntry 实例持有一个 CompletableFuture 实例,这里主动完成订阅任务,但以失败告终
value.getPromise().completeExceptionally(e);
return;
}
// 执行至此,说明订阅任务完成了而且是成功的
// RedissonLockEntry 实例持有一个 CompletableFuture 实例,这里主动完成订阅任务
value.getPromise().complete(value);
});
});
// RedissonLockEntry 实例构建的时候会将 newPromise 传进去
// 这里直接返回 newPromise,后续可以直接通过该 newPromise 获取 RedissonLockEntry 实例
return newPromise;
}
}
在上述订阅逻辑中,监听器是用来干嘛的呢?当然是接收到释放锁的消息后进行回调。在监听器中会调用到 LockPubSub 中的 onMessage() 方法,其主要内容如下。
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// RedissonLockEntry 中有一 Semaphore 类型、名为 latch 的成员变量
// 该 Semaphore 有且只有一个许可(信号量)
// 调用 Semaphore 的 release() 是为了唤醒另一阻塞的线程
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
// Ignore details
}
}
}
- C. commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS)
subscribeFuture
是 subscribe(threadId) 返回、代表异步订阅任务的执行结果。commandExecutor.getNow(subscribeFuture) 其实就是执行 CompletableFuture 的 get() 方法,用于获取 RedissonLockEntry 实例。剩下的就是通过 Semaphore 的 tryAcquire() 方法来阻塞一段时间了。如果在阻塞时间内,刚好有对象释放掉了锁,那么 LockPubSub 将会通过释放许可来唤醒阻塞的线程。
2.2.2 释放锁
释放锁包含两个逻辑,分别是通过 lua 脚本来释放锁和删除自动续期任务。其中释放锁的 lua 脚本如下。
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
参考文档
[1] RedLock: redis.io/docs/manual…
[2] How to do distributed locking: martin.kleppmann.com/2016/02/08/…
转载自:https://juejin.cn/post/7208112485763825722