Redis分布式锁深入分析
Redis分布式锁深入分析
有关分布式锁的制作我在之前的文章已经提过,感兴趣的可以看一下
但是这个方法仍然有问题存在,下面我们来聊一聊
注意,这篇文章本质上只是讨论分布式锁的问题,如果要看源代码分析,可以看看这位大佬的代码,我在这里引入代码只是想要说明一些解决方案:分布式锁 | Joseph's Blog (gitee.io)
最原始的Redis分布式锁
最开始大家刚学分布式锁的时候,用的是这个指令,
setnx key value
然后使用expire给他设置过期时间
看似没有问题

难道真的没问题吗?
试想一下,在高并发下,redis出现了雪崩,那么你设置了setnx,但是在设置expire之前崩了,呃呃呃~
没错,要解决这个问题,得实现原子性,原子性,我们在MySQL里面通过学习了事务来解决

那么redis,能不能类似实现事务呢?其实redis本身是有事务的,但是这种简单的语句,用Lua也行(没错,就是你打游戏开脚本哪个)
但是在这里我们不讲Lua,主要说一下思想,其实就是通过lua将两个原子语句封装在一起,再发送给redis服务器进行执行
lua-redis快速入门直接看最后
这个分布式锁实现过于简单,就不在这里说了,hhh~
Redis官方针对SETNX的改动
其实Redis官方在后面也看出了SETNX的缺点,所以他在2.6.12版本开始,加入了一个新的指令
set key value EX|PX nx|ex
EX|PX是expire和pexpire,nx是不存在则执行,ex是存在则执行
简单说下,然后RedisTemplate.setValue().setIfAbsent()方法也进行了重写
这样就保证了原子性,这个方法在我之前的文章里面也用过。
并且我参考Redisson的思想制作了分布式锁看门狗机制
当时其实是在想续期问题如何解决,解决之后就感觉自己

直到最近,群佬看博客,指出了一个问题,这个方法是否可重入呢?

说到这里,你可能对可重入有点迷惑,那么现在,我们来介绍一下可重入锁
可重入锁
什么是可重入锁?
来看看介绍吧。
可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。
简单的例子,这里我用伪代码来解释
syn(this){
sout("加锁成功A")
syn(this){
sout("加锁成功B")
}
}
那么这里,我们会发生什么呢?
按照没接触可重入锁的情况或者没有这样试过的情况来说,执行完 sout("加锁成功A")后便会产生死锁问题
而可重入锁,就是说,在此时,你依然可以进入并执行sout("加锁成功B")
那么应用场景?
最容易想到的是递归调用,但是还有其他的业务方面可以说一说,
比如你要调用业务方法A,业务A中有操纵了要上锁业务B,同时业务A又需要全局上锁,那么这个地方就需要可重入了
基于Redis-Hash的可重入锁实现
在Redisson中,采用的是hash进行锁的存储,然后对hash设置一个过期时间
大概的数据结构是这样的

hashname为key,hashkey为thread1,value是锁的重入次数
但是这里我要提一点,这里的thread1,可不仅仅是threadId,使用分布式锁通常是在分布式、微服务i项目下,不同的服务中也有可能出现线程ID相同的问题,所以这里加一个服务名,其实生成个UUID就可以了
大概的格式就是这样:
mylock:HEX(uuid+theadId):num
但是还有个设置过期时间的问题,如何设置?
我这里跟着我之前的帖子来讲,在那里我是使用的RedisTemplate来实现分布式锁+看门狗机制
但是没有考虑可重入的问题,那么我这次就加上
我们要加过期时间,同时又要确保原子性,那么就用Lua
加锁
对于加锁的Lua如下
-- 如果Lock存在
if redis.call("exists",KEYS[1]) ~= 0 then
-- 如果不是自己的锁
if redis.call("exists",KEYS[1],ARGS[1]) == 0 then
-- 不是自己的锁,返回剩余时间
return redis.call("pttl",KEYS[1]);
end
-- 如果是自己的锁就记录次数
redis.call("hincrby",KEYS[1],ARGS[1],1);
-- 延期
redis.call("pexpire",KEYS[1],ARGS[2]);
else
redis.call("hset",KEYS[1],ARGS[1],1);
-- 设置默认延期
redis.call("pexpire",KEYS[1],ARGS[2]);
end
-- 如果Lock不存在,那么就直接加上就可以了,hhh
return nil;
这里解释一下KEY和ARG,key是hash名,args是指命令携带参数
key1:索命
args1:服务线程唯一ID
args2:过期时间
然后在代码里面的实现

解锁
解锁也差不多
-- 解锁的逻辑和加锁相似
-- 如果Lock存在
if redis.call("exists",KEYS[1]) ~= 0 then
-- 如果是自己的锁
if redis.call("hexists",KEYS[1],ARGS[1]) ~= 0 then
-- 如果是最后一层 直接delete
if redis.call("hget",KEYS[1],ARGS[1]) == 0 then
redis.call("del",KEYs[1]);
a=0
else
-- 如果不是,那么久锁层数减一
a=redis.call("hincrby",KEYS[1],ARGS[1],-1);
end
end
return a;
end
-- 如果Lock不存在,那么就return,hhh
return nil;

续期的话本来就是一条语句,不变就可以了
然后我和之前的代码相比,自旋锁改了一下,hhh

看门狗机制实现
之前其实已经实现过,这里就再来看看吧,这里我为了方便一点,用的Hutool来演示,但是实际用的时候还是用Netty等框架比较好,毕竟Redission也是用的Netty

目前还存在的问题+Reddisson源码分析 —— 自旋锁
没错,别以为这样就完了,细心的话会发现我上面的代码里面,写的是最暴力的自旋锁(图一个方便,hhh)
如果说一直循环下去,那么无疑是非常浪费CPU的

那么如何解决?
解决方案
细心的同学已经发现了,在我加锁失败的时候,会返回一个ttl,也就是当前key还有多久失效
那么我们是不是可以在while里面是指一个阻塞,然后等过了这么久再唤醒线程就可以了?
没错,Reddisson底层也是这样实现的,基于Redis发布订阅,但是这里我给大家简单引个路子
你可以理解为把阻塞的线程ID放进一个阻塞队列里面,而我们的服务器就去订阅这个队列,其实这个队列在Redis里面叫做Channel,感兴趣的可以去看看。
那么是如何订阅的呢?
其实在源代码中,Redisson是放了一个“消息检测器”来进行监听
下面来看看Redisson加锁的代码
阻塞加锁源码 lock()
//阻塞加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return; //这里拿到锁了
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId); //对当前线程进行消息订阅
pubSub.timeout(future); //设置订阅超时
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
// 循环重试获取锁,直至重新获取锁成功才跳出循环
// 此种做法阻塞进程,一直处于等待锁手动释放或者超时才继续线程
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
非阻塞加锁
//不阻塞加锁,waitTime是最大容忍时间,这个概念不做过多解释,就是等待你自选的时间
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 计算第一次尝试获取锁后剩余的时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId); //获取失败
return false;
}
current = System.currentTimeMillis();
//消息订阅
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS); //设置一个最多订阅时间
} catch (TimeoutException e) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + time + "ms. " +
"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);//阻塞,等待消息
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);//阻塞,等待消息
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
消息订阅
其上述类容中的订阅,都通过下面的方法进行回调,在解锁的时候会发布消息
package org.redisson.pubsub;
import org.redisson.RedissonLockEntry;
import java.util.concurrent.CompletableFuture;
/**
* LockPubSub类是一个用于锁的发布-订阅实现。
* 它继承自PublishSubscribe类,用于处理锁的订阅和消息发布。
* 锁的订阅者是RedissonLockEntry对象。
* 当接收到特定的消息时,会执行相应的操作。
*/
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
// 解锁消息
public static final Long UNLOCK_MESSAGE = 0L;
// 读锁解锁消息
public static final Long READ_UNLOCK_MESSAGE = 1L;
public LockPubSub(PublishSubscribeService service) {
super(service);
}
@Override
protected RedissonLockEntry createEntry(CompletableFuture<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
// 获取等待执行的Runnable对象,并执行
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 释放锁计数器
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
// 循环执行等待执行的Runnable对象,并执行
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
// 释放锁计数器,释放所有等待的读锁
value.getLatch().release(value.getLatch().getQueueLength());
}
}
}
总结
其实这里收获最大的,应该是自旋锁,虽然说在最后使用了发布订阅来完成异步唤醒,但还是有一些缺点,比如这个锁是否公平,如果说,这里要让你实现公平锁,读者你又打算如何解决?
想到这里,我又想到了Reactor模型,其实我们可以做一个BossGroup来存放一下阻塞线程ID,其实就是一个阻塞队列
再用一个WorkerGroup来对每个ThreadID进行处理,当然这里提供的是一个思路,如果要完成的话,相当于是写一个小型中间件,也挺有意思,后面打算试一试,hhh
Redis-Lua快速学习
当编写 Lua 脚本与 Redis 进行交互时,以下是一些常用的 Lua 脚本指南和技巧:
- 命令调用:使用
redis.call函数来调用 Redis 命令。例如,redis.call('GET', 'mykey')将调用 Redis 的 GET 命令并返回键为 'mykey' 的值。 - 参数访问:可以使用
KEYS表来访问传递给 Lua 脚本的键列表,使用ARGV表来访问传递给 Lua 脚本的额外参数。例如,KEYS[1]表示第一个键,ARGV[1]表示第一个额外参数。 - 返回结果:Lua 脚本可以通过使用
return语句来返回结果。例如,return redis.call('GET', 'mykey')将返回键为 'mykey' 的值。 - 循环和条件:Lua 提供了一些基本的循环和条件语句,例如
for、while、if等,可以在 Lua 脚本中使用。 - 容错处理:在编写 Lua 脚本时,可以考虑添加容错处理,例如使用
pcall函数来捕获 Redis 命令的错误并进行处理。 - 事务支持:Redis 的 Lua 脚本支持事务,可以使用
redis.call('MULTI')开始事务,然后使用redis.call('EXEC')执行事务。在事务中,可以执行多个 Redis 命令,并将其作为一个原子操作进行提交或回滚。 - 脚本缓存:Redis 可以缓存 Lua 脚本,以提高执行效率。您可以使用
EVALSHA命令来执行缓存的脚本。在 Java RedisTemplate 中,您可以使用execute方法的execute(script, keys, args)形式来执行缓存的脚本。
这些指南和技巧可帮助您编写更复杂和灵活的 Lua 脚本与 Redis 进行交互。在编写 Lua 脚本时,请参考 Redis 官方文档以及 Lua 官方文档,以了解更多 Lua 编程语言和 Redis 命令的细节和用法。
当编写 Lua 脚本时,可以使用循环和条件语句来实现逻辑控制。以下是一些示例:
- 使用
for循环:
for i = 1, 10 do
-- 执行操作,例如打印循环变量
print(i)
end
- 使用
while循环:
local i = 1
while i <= 10 do
-- 执行操作,例如打印循环变量
print(i)
i = i + 1
end
- 使用
if-else条件:
local num = 5
if num < 0 then
print("Number is negative")
elseif num == 0 then
print("Number is zero")
else
print("Number is positive")
end
- 使用
repeat-until循环:
local i = 1
repeat
-- 执行操作,例如打印循环变量
print(i)
i = i + 1
until i > 10
这些示例展示了在 Lua 脚本中使用循环和条件语句的基本用法。您可以根据自己的需求和逻辑在 Lua 脚本中编写更复杂的循环和条件控制结构。请注意,在 Lua 中,条件语句使用 if-elseif-else 结构,而不是像其他编程语言中的 if-else 结构。此外,Lua 的索引从 1 开始,而不是从 0 开始,这与一些其他编程语言有所不同。
请确保根据您的实际需求和逻辑编写正确的循环和条件控制结构,并根据 Redis 脚本的要求将其集成到您的 Lua 脚本中。
转载自:https://juejin.cn/post/7244820297290645541