(Redis篇)超细Redisson分布式Lock锁源码分析
前言
redisson是一款优秀的java版的Redis客户端,在越来越多的分布式场景下解决了许多并发安全问题,本文只刨析redisson分布式锁的源码实现。
分布式场景下普通锁的不足
在普通单体式场景下出现并发问题我们可能会选择 Synchronized 或者 ReentrantLock 进行加锁操作来保证原子性操作,当出现在分布式或者集群架构下,这种jvm进程级别的锁就无法给我们带来保证。
如图客户端在负载均衡机制下请求会分发到不同的集群节点上,park加锁操作只会阻塞单机
分布式解决方案
只需要搞一个中间缓冲地带就可以解决这种问题,那么我们就要利用中间件来解决了,例如数据库,zookeeper,redis 都可以来帮我们实现一把分布式锁。
redisson分布式锁使用
导入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
写入配置
package redisson;
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* DATE: 2022/4/13
* @author 码下客
*/
@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
示例代码
package redisson;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* DATE: 2022/4/13
* @author: 码下客
*/
@Service
public class RedissonLockService {
@Autowired
private Redisson redisson;
public void setLock(){
String lockKey = "redisKey";
//获取锁对象
RLock redissonLock = redisson.getLock(lockKey);
redissonLock.lock();
try {
/**
* 有并发安全问题的业务逻辑
*/
} finally {
redissonLock.unlock();
}
}
}
redisson锁的使用还是很简单的 只需要在有并发安全问题的业务上加锁,然后防止出现异常产生死锁在finally中解锁。
redisson Lock源码分析
在分析源码之前我们先思考我们自己去实现一把redis分布式锁会有哪些问题?
一般而言我们常用的redis实现锁命令就是setnx,那么当使用setnx去打标记加锁之后如果系统突然挂了,这把锁是不是变成了死锁?加上超时时间的化,这个超时时间设置多久合适?设置太短,出现A业务还没跑完,锁自己释放了,这时候又来一个线程去执行B业务进行了加锁操作,同一时刻A业务执行完了去释放了B业务的锁,这时候B业务执行途中是没有锁的是不是又会发生线程安全问题?如果在加锁之前给每个线程搞一个uuid,然后释放锁的时候去判断uuid是否相同也无法完全避免刚刚的问题发生,因为释放锁和判断uuid不是一个原子操作。带着这些问题去redisson lock当中寻找答案。
Lock
进入到源码RedissonLock.java下
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lock方法这是我们调用的加锁入口持续定位lockInterruptibly()->lockInterruptibly(-1, null);
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
这个方法必然是我们的核心方法,我们逐步去分析:
long threadId = Thread.currentThread().getId();
首先获取到了一个线程id,然后去调用了tryAcquire()方法,见名思意 尝试获得锁,那么下面看看tryAcquire()的业务逻辑
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
leaseTime != -1 的逻辑处理先不看,走我们的主干线,调用了一个tryLockInnerAsync()方法 异步去尝试加锁,首先看第一个参数 commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 指向
private long lockWatchdogTimeout = 30 * 1000;
含义是看门狗锁的过期时间,第二个参数 TimeUnit.MILLISECONDS 时间单位,第三个参数 threadId 线程id, 第四个参数 RedisCommands.EVAL_LONG 等等执行lua脚本要使用。接下来去详细看看这个方法。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
主要就是封装的这一大段lua脚本,我们来剖析这lua脚本之前先了解lua脚本的基本语义。如图上命令
redis.call('exists', KEYS[1]) == 0)
redis.call你可以看作是lua脚本下调用redis命令的一个方法,'exists'是具体的某个方法也就是对应的redis元素生命令,KEY[1]呢是从Collections.singletonList(getName())这个集合中取值,getName()是一开始创建锁的key值,那么我们就得出第一个命令(redis.call('exists', KEYS[1]) == 0) 的含义是判断这个key存不存在redis中。剩下的语义同理,ARGV[1]对应 internalLockLeaseTime 代表超时时间, ARGV[2]对应 getLockName(threadId) 含义是uuid拼接线程id后的字符串。那么第一个if分支的lua脚本含义就是判断key是否存在redis中,如果不存在我就设置这个key为hash并且参数为一个uuid+线程id,然后又将这个key设置了一个过期时间返回null, 接下来我们先走返回null 设置成功的代码逻辑。
再次回到tryAcquireOnceAsync()方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
刚刚执行的tryLockInnerAsync()返回值是一个future任务(这个不懂的可以去了解一下并发编程的知识),下面是调用了一个监听器,也就是说他监听tryLockInnerAsync()这个异步任务,执行完成就会去回调operationComplete()方法,这个方法内逻辑首先是判断这个任务执行是否成功,然后又调了一个getNow(),这个就是取这个任务的返回值,刚刚我们分析那段lua语义也得到设置成功返回是null,也就走到了scheduleExpirationRenewal()这个锁续命逻辑,给我们刚刚设置的30秒到期时间去定时续命。
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
我们直接看他的主干线逻辑,new TimerTask()这里是又搞了一个任务,他有两个参数internalLockLeaseTime / 3 和TimeUnit.MILLISECONDS, 代表10秒之后去执行这个任务(回顾--- 当前方法是监听设置锁成功后执行的 那么也就是说 我当前的new TimerTask()任务是会在设置成功后10秒执行,这个key的过期时间还剩下20秒),然后去执行了一段lua脚本,大致意思就是去判断这个key是否还存在,存在的话去设置新的过期时间 返回值是1(redis 1 0 返回到代码中对应是boolean)。下面又一个addListener监听器去监听future任务,看到当续命成功时又去递归调用 scheduleExpirationRenewal()方法,通过递归的方式来达到类似定时任务的效果。 到这里加锁成功的核心逻辑也就读完了,在回去看一下加锁失败的逻辑。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
加锁失败是执行return redis.call('pttl', KEYS[1]); 这里是返回了这把锁的剩余过期时间,继续回到 lockInterruptibly()方法
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
Long ttl = tryAcquire(leaseTime, unit, threadId); 尝试加锁这个逻辑刚刚读完我们知道 ttl是null 说明加锁失败 ttl有值则是加锁失败获取这把锁剩下的过期时间。subscribe()是一个订阅功能,给当前加锁失败的线程去订阅一个channel,有订阅肯定有通知稍后去讲通知在哪里,以及通知的作用。我们先看这个while(true),又去尝试加锁刷新这个ttl的时间, 分析ttl >= 0的逻辑,getLatch()这个返回值
public Semaphore getLatch() {
return latch;
}
是一个Semaphore 信号量 ,获取这个信号量后调用了tryAcquire()方法 传参是刚刚的ttl, 意思就是等待ttl秒获取许可, 假设现在ttl是10秒,这个方法就会阻塞在这里等待10s之后去循环这个while继续尝试加锁。 既然有阻塞就会有唤醒,什么情况下会去唤醒。在去假设一种场景,假如ttl返回还有20s,持有锁的线程现在unlock掉了,等待获取锁的线程这时候应该是立刻感知去尝试加锁而不是在继续等待20s,那么应该就是在unlock的时候会去唤醒。
@Override
public void unlock() {
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
unlock源码我们只看这个unlockInnerAsync()这个异步方法
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
继续分析lua语义,有一个redis.call('publish', KEYS[2], ARGV[1])的命令, 调用了redis的publish指令,在lock的时候未获取锁的线程有订阅一个频道,这个指令呢就是去给这个频道发送消息,告诉订阅频道的线程我现在释放锁了,我们再去找一下消费消息的源码。 在LockPubSub.class下有个onMessage
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(unlockMessage)) {
value.getLatch().release();
while (true) {
Runnable runnableToExecute = null;
synchronized (value) {
Runnable runnable = value.getListeners().poll();
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnableToExecute = runnable;
} else {
value.addListener(runnable);
}
}
}
if (runnableToExecute != null) {
runnableToExecute.run();
} else {
return;
}
}
}
}
value.getLatch().release(); 是不是获取信号量并且释放许可,lock中的tryAcquire()就会得到释放再次循环尝试加锁,这样就很好的形成了闭环。
总结:
在lock中加锁成功情况下通过lua脚本来保证原子性,由于超时时间的不可控因素利用看门狗机制去递归延迟超时时间,加锁失败情况下,使用了redis的发布订阅功能先去订阅加锁线程频道,然后去循环尝试加锁,循环过程中利用信号量的特性,做到了延迟循环以及阻塞等待唤醒重新加锁。 unlock做到了给频道发送消息,使信号量释放许可达到唤醒作用。
本文略微抽象,最好下载Redisson源码在结合本文一并观看。
转载自:https://juejin.cn/post/7086405088608157726