【Redisson】可重入锁剖析(二)
一、前言
Redisson
是 Redis
最知名的一个开源客户端框架,学习其:官方文档
- 如何实现
Redis
分布式锁? - 各种分布式锁的原理
代码栗子如下:
// 尝试获取 myLock 这个key的锁
RLock lock = redisson.getLock("myLock");
// 常见加锁方式一:
// - 如果没有人加这把锁,那自己就可以加这把锁
// - 如果有人加这把锁了,那自己这时就会阻塞住
lock.lock();
// 常见加锁方式二:
// 尝试加锁,加锁成功后,10秒后自动释放锁
lock.lock(10, TimeUnit.SECONDS);
// 常见加锁方式三:
// 尝试加锁指定一个时间,最多等待 100秒:加锁成功返回 true,加锁失败返回 false
// 加锁成功后,10秒后自动释放锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
// 释放锁
lock.unlock();
}
}
Tips:
看门狗(watchdog
)
为避免获取锁的
Redisson
实例客户端崩溃,导致锁无法释放。 看门狗,它会在锁持有者Redisson
实例处于活动状态时延长锁的过期时间。默认锁定看门狗超时为 30 秒
(1)可重入锁源码剖析之可重入加锁
捋一遍加锁过程:
- 加锁:就是执行
lua
脚本,往里塞一个hashmap
# "dfd3aabb-82ce-4c54-966d-5719675a3d62:1",代表某客户端,1 是 threadId
# 1,代表自增次数
{
"dfd3aabb-82ce-4c54-966d-5719675a3d62:1": 1
}
- 会启动看门狗:默认每 10秒调度一次,是否续命生存时间 30秒
1)那么,客户端一个线程持有分布式锁 key1
,另一个线程又去持有,那会如何?
答案:表象上没有体现,
Redis
存储数据的值 +1。
举个栗子:嵌套调用
RLock lock = redisson.getLock("myLock");
public void test() {
lock.lock();
Thread.sleep(30000);
test1();
lock.unlock();
}
private void test1() {
lock.lock();
System.out.println("test1 加锁了");
Thread.sleep(30000);
lock.unlock();
}
再来看下源码中的 lua
脚本:就会了然
-- 参数说明:
-- KEYS[1] = myLock,key 锁名
-- ARGV[1] = 30000,锁过期时间 30秒
-- ARGV[2] = dfd3aabb-82ce-4c54-966d-5719675a3d62:1,UUID生成的,1 是 threadId
-- 如果 key 不存在
if (redis.call('exists', KEYS[1]) == 0) then
-- 设置值,"dfd3aabb-82ce-4c54-966d-5719675a3d62:1" : 1
redis.call('hset', KEYS[1], ARGV[2], 1);
-- 设置过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 如果 myLock dfd3aabb-82ce-4c54-966d-5719675a3d62:1 存在,且值为1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 自增1,代表重入次数
redis.call('pexpire', KEYS[1], ARGV[1]); -- 设置过期时间
return nil;
end;
return redis.call('pttl', KEYS[1]);
上面的栗子运行结果如下:
172.18.1.23:7004> hgetall myLock
1) "63d68b35-2027-480f-98b8-950ffc94467b:1"
2) "1"
172.18.1.23:7004> hgetall myLock
1) "63d68b35-2027-480f-98b8-950ffc94467b:1"
2) "2"
172.18.1.23:7004> pttl myLock
(integer) 20405
172.18.1.23:7004> pttl myLock
(integer) 27949
172.18.1.23:7004> hgetall myLock
1) "63d68b35-2027-480f-98b8-950ffc94467b:1"
2) "2"
可以看到看门狗续命后,value
的值不会增加。
2)客户端A持有分布式锁 key1
,客户端B再去持有锁,那会如何?
客户端B 当然阻塞喽。
只有等客户端A 释放了锁,客户端B 才能去加锁。
(2)可重入锁源码剖析之锁互斥
互斥场景有哪些?
- 同个线程,多次
lock.lock()
: 不会互斥,因为重入 - 不同线程,进行
lock.lock()
: 会互斥,互相阻塞 - 不同客户端,进行
lock.lock()
:会互斥,互相阻塞
回顾下,分布式锁存储在 Redis
是什么:
# Redis 中如下
172.18.1.23:7004> hgetall myLock
1) "dfd3aabb-82ce-4c54-966d-5719675a3d62:1"
2) "1"
# 实际上就类似 map:
# "dfd3aabb-82ce-4c54-966d-5719675a3d62:1",代表某客户端,1 是 threadId
# 1,代表自增次数
{
"dfd3aabb-82ce-4c54-966d-5719675a3d62:1": 1
}
所以能不能重入锁,要看客户端和线程是否一致。
对应 lua
脚本:
-- 如果 myLock dfd3aabb-82ce-4c54-966d-5719675a3d62:1 存在,且值为1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 自增1,代表重入次数
redis.call('pexpire', KEYS[1], ARGV[1]); -- 设置过期时间
return nil;
end;
-- 加锁没成功:
-- 直接返回key的剩余存活时间
return redis.call('pttl', KEYS[1]);
下面来剖析下源码:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly)
throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 如果第一次加锁,tt1 一定是 null
// 如果一个线程多次加锁,那么就会重入锁,ttl 也一定是 null
// 如果加锁没成功,锁被其他机器占用了,得到剩余生存时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
// 锁不成功,就会走这,阻塞
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
// 这里使用 Semaphore
// 等待一段时间,再次进入循环
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
getEntry(threadId).getLatch().acquire();
} else {
getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
}
最后总结下,互斥流程:
(3)可重入锁源码剖析之释放锁
熟知前文,这里想想就可知,释放锁会将锁持有数减 1。
开始源码剖析之旅,从这里进去:
lock.unlock();
进入源码:
// RedissonLock.java
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
// 再往里走:
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
// 这里就会执行 lua 脚本,重要逻辑处:
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause
= new IllegalMonitorStateException("attempt to unlock lock,"
+ "not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
把 lua
脚本提取出来:
-- key 和 field 不匹配, 说明当前客户端线程没有持有锁, 不能主动解锁。
-- 不是自己加的锁不能解锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- value -1, 可重入数 -1
-- 说明减少一次持有
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 如果counter > 0 说明锁还在持有, 不能删除key
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]); -- 那么就续命
return 0;
-- 删除 key 并且 publish 解锁消息
else
redis.call('del', KEYS[1]); -- 删除锁
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end; " +
return nil;
KEYS[1]
: 需要加锁的key
, 这里需要是字符串类型。KEYS[2]
:redis
消息的ChannelName
, 一个分布式锁对应唯一的一个channelName
:redisson_lockchannel{” + getName() + “}
ARGV[1]
:reids
消息体, 这里只需要一个字节的标记就可以, 主要标记redis
的key
已经解锁, 再结合redis
的Subscribe
, 能唤醒其他订阅解锁消息的客户端线程申请锁。ARGV[2]
: 锁的超时时间, 防止死锁ARGV[3]
: 锁的唯一标识, 也就是刚才介绍的id(UUID.randomUUID()) + “:” + threadId
总结下,执行流程图如下:
执行 lock.unlock()
, 就可以释放分布式锁:
- 每次都对
myLock
数据结构中的那个加锁次数减1。 - 如果发现加锁次数是0了, 说明这个客户端已经不再持有锁了, 此时就会用:
del myLock
命令, 从redis
里删除这个key
。 - 另外的客户端2 就可以尝试完成加锁了。
转载自:https://juejin.cn/post/7110218613608906788