redisTemplate与Redisson实现分布式锁
redisTemplate分布式锁
基本原理
我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。“占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方。等待可以自旋的方式。
阶段一
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
//阶段一
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
//获取到锁,执行业务
if (lock) {
Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
//删除锁,如果在此之前报错或宕机会造成死锁
stringRedisTemplate.delete("lock");
return categoriesDb;
}else {
//没获取到锁,等待100ms重试
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDbWithRedisLock();
}
}
public Map<String, List<Catalog2Vo>> getCategoryMap() {
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
String catalogJson = ops.get("catalogJson");
if (StringUtils.isEmpty(catalogJson)) {
System.out.println("缓存不命中,准备查询数据库。。。");
Map<String, List<Catalog2Vo>> categoriesDb= getCategoriesDb();
String toJSONString = JSON.toJSONString(categoriesDb);
ops.set("catalogJson", toJSONString);
return categoriesDb;
}
System.out.println("缓存命中。。。。");
Map<String, List<Catalog2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catalog2Vo>>>() {});
return listMap;
}
问题: setnx占好了位,业务代码异常或者程序在页面过程中宕机。没有执行删除锁逻辑,这就造成了死锁解决: 设置锁的自动过期,即使没有删除,会自动删除
阶段二
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
if (lock) {
//设置过期时间
stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);
Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
stringRedisTemplate.delete("lock");
return categoriesDb;
}else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDbWithRedisLock();
}
}
问题: setnx设置好,正要去设置过期时间,宕机。又死锁了。解决: 设置过期时间和占位必须是原子的。redis支持使用setnx ex命令
阶段三
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
//加锁的同时设置过期时间,二者是原子性操作
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);
if (lock) {
Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
//模拟超长的业务执行时间
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
stringRedisTemplate.delete("lock");
return categoriesDb;
}else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDbWithRedisLock();
}
}
问题: 删除锁直接删除???如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。解决: 占锁的时候,值指定为uuid,每个人匹配是自己的锁才删除。
阶段四
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
String uuid = UUID.randomUUID().toString();
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
//为当前锁设置唯一的uuid,只有当uuid相同时才会进行删除锁的操作
Boolean lock = ops.setIfAbsent("lock", uuid,5, TimeUnit.SECONDS);
if (lock) {
Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
String lockValue = ops.get("lock");
if (lockValue.equals(uuid)) {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
stringRedisTemplate.delete("lock");
}
return categoriesDb;
}else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDbWithRedisLock();
}
}
问题: 如果正好判断是当前值,正要删除锁的时候,锁已经过期,别人已经设置到了新的值。那么我们删除的是别人的锁解决: 删除锁必须保证原子性。使用redis+Lua脚本完成
阶段五-最终形态
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
String uuid = UUID.randomUUID().toString();
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
Boolean lock = ops.setIfAbsent("lock", uuid,5, TimeUnit.SECONDS);
if (lock) {
Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
String lockValue = ops.get("lock");
String script = "if redis.call("get",KEYS[1]) == ARGV[1] then\n" +
" return redis.call("del",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);
return categoriesDb;
}else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDbWithRedisLock();
}
}
保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。更难的事情,锁的自动续期
2. Redisson 实现分布式锁
- 单点故障: Redisson的分布式锁依赖于Redis集群,如果Redis集群出现故障或不可用,可能导致分布式锁的可靠性和可用性受到影响。因此,在使用Redisson分布式锁时,需要特别关注Redis集群的稳定性和高可用性。
- 锁竞争: 当多个线程同时请求获取分布式锁时,可能出现锁竞争的情况。如果锁竞争较为激烈,可能会导致性能下降和请求超时等问题。此外,由于Redisson分布式锁是基于Redis进行实现的,如果Redis节点的处理能力无法满足高并发的锁请求,可能会导致锁请求被延迟或阻塞。
- 死锁风险: 分布式环境下,由于网络通信、节点故障等因素,可能导致锁无法正常释放,从而引发死锁问题。需要合理设计和使用锁的超时时间、自动释放机制等来降低死锁风险。
- 锁粒度管理: 在分布式环境下,锁的粒度管理是一个重要的问题。过于细粒度的锁可能导致并发性能下降,而过于粗粒度的锁可能会影响系统的可伸缩性和并发性能。需要根据具体的业务场景和并发访问模式合理选择锁的粒度。
- 数据一致性: 使用分布式锁保证多个操作的原子性是很常见的应用场景之一。然而,分布式锁通常只能提供粗粒度的互斥访问,不能保证数据的完全一致性。在一些特定的应用场景中,可能需要额外的措施来确保数据的最终一致性。
以下,我们通过一段 Redisson 的分布式锁接口的源码来分析:
public interface RLock extends Lock, RLockAsync {
String getName();
void lockInterruptibly(long var1, TimeUnit var3) throws InterruptedException;
boolean tryLock(long var1, long var3, TimeUnit var5) throws InterruptedException;
void lock(long var1, TimeUnit var3);
boolean forceUnlock();
boolean isLocked();
boolean isHeldByThread(long var1);
boolean isHeldByCurrentThread();
int getHoldCount();
long remainTimeToLive();
}
RLock接口主要继承了Lock接口,它是Redisson提供的用于分布式锁的核心接口,它定义了获取锁和释放锁等方法 ,并扩展了很多方法。
如:
- void lock(long leaseTime, TimeUnit unit)
- 功能:获取锁,并设置锁的自动释放时间。
- 参数:
-
- leaseTime:锁的自动释放时间。
- unit:时间单位。
- boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException
- 功能:尝试在指定的等待时间内获取锁,并设置锁的自动释放时间。
- 参数:
-
- waitTime:等待获取锁的最大时间量。
- leaseTime:锁的自动释放时间。
- unit:时间单位。
- 返回值:如果在等待时间内成功获取锁,则返回true;否则返回false。
- 异常:如果在等待获取锁的过程中被中断,则抛出InterruptedException。
- RFuture lockAsync(long leaseTime, TimeUnit unit)
- 功能:异步方式获取锁,并设置锁的自动释放时间。
- 参数:
-
- leaseTime:锁的自动释放时间。
- unit:时间单位。
- 返回值:一个RFuture对象,表示异步操作的结果。
除此之外,RLock 接口还提供了许多的的方法对于 lock()方法实现拓展,使得使得在获取锁时可以设置自动释放时间或进行异步操作。这样可以更加灵活地实现控制锁的行为,从而适应不同场景下的需求。
除了上述拓展,RLock接口还提供了其他方法来支持可重入锁、公平锁、红锁、读写锁等特性,以便满足更为复杂的分布式锁需求。
3. SpringBoot 整合 Redisson
3.1. Maven 依赖导入
<!-- Spring Data Redis依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redisson依赖 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.23.5</version>
</dependency>
3.2. 配置文件参数配置(需要根据你的情况进行修改)
spring:
redis:
cluster:
nodes:
- 192.168.88.101:6379
- 192.168.88.101:6389
- 192.168.88.101:6399
password: 123456
3.3. 创建 Redisson 客户端
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Value("${spring.redis.cluster.nodes}")
private String clusterNodes;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useClusterServers()
.addNodeAddress(clusterNodes.split(","))
.setPassword(password);
return Redisson.create(config);
}
}
使用分布式锁
在需要使用分布式锁的地方注入RedissonClient实例,并使用getLock方法创建一个分布式锁对象(RLock)。
java
复制代码
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SomeService {
@Autowired
private RedissonClient redissonClient;
public void doSomething() {
String lockKey = "myLock"; // 锁的key
RLock lock = redissonClient.getLock(lockKey);
try {
lock.lock(); // 获取锁
// 在这里执行需要加锁保护的代码
} finally {
lock.unlock(); // 释放锁
}
}
}
RLock.lock()
使用 Rlock.lock() 方法时 ,如果当前没有其他线程或进程持有该锁,那么调用线程将立即获得锁定,并继续执行后续的代码。如果其他线程或进程已经持有了该锁,那么调用线程将被阻塞,直到该锁被释放为止。
此外,Rlock.lock() 方法还具有以下特点:
-
可重入性: 同一个线程可以多次调用 Rlock.lock() 方法而不会造成死锁,只需确保每次 lock() 调用都有相应的 unlock() 调用与之匹配。
-
超时机制: 可以通过 lock() 方法中的参数设置等待锁的超时时间,避免因为无法获得锁而一直等待。
-
自动续期: 当线程持有锁的时间超过设置的锁的过期时间时,Redisson 会自动延长锁的有效期,避免因为业务执行时间过长而导致锁过期。
-
防止死锁: Redisson 通过唯一标识锁的 ID 来区分不同的锁,防止发生死锁。
转载自:https://juejin.cn/post/7345008762103578651