likes
comments
collection
share

Redis 分布式锁 —— Redisson

作者站长头像
站长
· 阅读数 42

基于 Redis 的分布式锁优化

  • 基于 setnx 实现的分布式锁存在的问题:
    • 不可重入:同一个线程无法多次获取同一把锁(eg:方法A调用方法B,在方法A中先获取锁,然后去调用方法B,方法B也需要获取同一把锁,这种情况下如果锁不可重入,方法B显然获取不到锁,会出现死锁的情况)
    • 不可重试:获取锁只尝试一次就返回 false,没有重试机制
    • 超时释放:超时释放虽然能够避免死锁,但如果业务执行执行时间较长导致锁释放,会存在安全隐患
    • 主从一致性:主从数据同步存在延迟,比如:线程在主节点获取了锁,尚未同步给从节点时主节点宕机,此时会选一个从节点作为新的主节点,这个从节点由于没有完成同步不存在锁的标识,此时其他线程可以趁虚而入拿到锁,这就造成多个线程同时拿到锁,就会出现安全问题)

Redisson

  • Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格。通俗来将,就是在 Redis 基础上实现的分布式工具集合
  • GitHub 地址

Redisson 入门

  • 引入依赖
<!--redisson-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>
  • 配置 Redisson 客户端
package com.hmdp.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        // 配置
        Config config = new Config();
        // 地址 & 密码
        config.useSingleServer().setAddress("redis://124.223.27.146:6379").setPassword("ruochen666");
        // 创建 RedissonClient 对象
        return Redisson.create(config);
    }
}
  • 使用 Redisson 的分布式锁
package com.hmdp;

import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@SpringBootTest
public class TestRedisson {

    @Resource
    private RedissonClient redissonClient;

    @Test
    void testRedisson() throws InterruptedException {
        // 获取锁
        RLock lock = redissonClient.getLock("lock");
        // 获取锁 参数:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
        boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
        if (isLock) {
            try {
                System.out.println("获取成功");
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    }
}

Redisson 可重入锁

  • 使用 Hash 结构,在 key 的位置记录锁的名称,在 field 的位置记录线程标识,在 value 位置记录锁的重入次数(重复获取锁时只需要判断获取锁的线程是否是自己即可)
  • 释放锁时,重入次数减1,并判断是否为0,若为0,直接将锁删除即可 Redis 分布式锁 —— Redisson
  • 基于 Redisson 可重入锁原理编写 Lua 脚本
local key = KEYS[1];  -- 锁的key
local threadId = ARGV[1]  -- 线程的唯一标识
local releaseTime = ARGV[2];  -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then
    -- 不存在,获取锁
    redis.call('hset', key, threadId, '1');
    -- 设置有效期
    redis.call('expire', key, releaseTime);
    return 1;  -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
    -- 存在,获取锁,重入次数+1
    redis.call('hincrby', key, threadId, '1');
    -- 设置有效期
    redis.call('expire', key, releaseTime);
    return 1;  -- 返回结果
end;
return 0;  -- 获取锁的不是自己,获取锁失败
local key = KEYS[1];  -- 锁的key
local threadId = ARGV[1]  -- 线程的唯一标识
local releaseTime = ARGV[2];  -- 锁的自动释放时间
-- 判断当前锁是否被自己持有
if(redis.call('hexists', key, threadId) == 0) then
    return nil;  --  不是自己,直接返回
end;
-- 是自己的锁,重入次数-1
local count = redis.call('hincrby', key, threadId, -1);
-- 判断重入次数是否为0
if (count > 0) then
    -- 大于0说明不能直接释放锁,重置锁有效期即可
    redis.call('expire', key, releaseTime);
    return nil;
else  -- 等于0可以直接释放锁
    redis.call('del', key);
    return nil;
end;

测试

  • 我们可以编写Java代码来测试一下 Redisson 的可重入锁
package com.hmdp;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@Slf4j
@SpringBootTest
public class TestRedisson {

    @Resource
    private RedissonClient redissonClient;

    private RLock lock;

    @BeforeEach
    void setUp() {
        lock = redissonClient.getLock("order");
    }

    @Test
    void method1() {
        // 尝试获取锁
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("获取锁失败 .... 1");
            return;
        }
        try {
            log.info("获取锁成功 .... 1");
            method2();
            log.info("开始执行业务 .... 1");
        } finally {
            log.warn("准备释放锁 .... 1");
            lock.unlock();
        }
    }

    void method2() {
        // 尝试获取锁
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("获取锁失败 .... 2");
            return;
        }
        try {
            log.info("获取锁成功 .... 2");
            log.info("开始执行业务 .... 2");
        } finally {
            log.warn("准备释放锁 .... 2");
            lock.unlock();
        }
    }
}
  • 当 method1 中获取锁后,可以看到 Redis 中 value 为 1 Redis 分布式锁 —— Redisson
  • 当 method2 获取锁后,value 值变为了 2 Redis 分布式锁 —— Redisson
  • method2 中释放锁后,value 值减为 1,method1 中释放锁后,锁被直接删除
  • 跟到 RedissonLock 底层,我们发现这里的 lua 脚本逻辑跟我们上面写的一样的 Redis 分布式锁 —— Redisson Redis 分布式锁 —— Redisson

Redisson 重试机制

  • tryLock 方法的第一个参数就是最长等待时长,获取锁失败后不会立即返回,而是在等待时间内不断进行尝试。若时间结束还没有获取成功,才会返回 false
  • 通过跟踪 tryLock 源码我们可以看到获取锁失败后返回的是锁的剩余有效时间(单位为ms) Redis 分布式锁 —— Redisson
  • 我们往上看,返回的是一个 RFuture 对象 Redis 分布式锁 —— Redisson
  • 继续返回剩余有效期 Redis 分布式锁 —— Redisson
  • 再往回查看,由前面我们知道获取锁失败返回的是null,获取锁成功返回的是锁的剩余有效时间。获取锁消耗时间如果大于等待时间,直接返回 false Redis 分布式锁 —— Redisson
  • 如果等待时间还有剩余,那就继续重试,但是这里的重试不是立即重试,我们可以看到有一个 subscribe 方法,这个是订阅释放锁的消息(如果有线程释放了锁就会发一个消息过来)(释放锁时的 publish 命令就是发布消息通知,subscribe 订阅的就是它发布的通知) Redis 分布式锁 —— Redisson
  • 消息通知也是不确定的,所以这里返回的仍旧是一个 Future,然后通过 await 等待结果,future 在指定时间内完成会返回 true(time 就是锁的剩余最大等待时间) Redis 分布式锁 —— Redisson
  • 如果在剩余等待时间内收到订阅通知,那么会继续计算剩余等待时间(排除掉订阅等待的时间),如果此时无剩余时间返回 false;如果剩余等待时间依然有剩余,就可以再次尝试获取锁 Redis 分布式锁 —— Redisson
  • 如果获取锁成功则返回 true,如果又失败,继续等待订阅 ...

Redisson WatchDog 机制

业务未执行完锁超时释放

  • 我们继续来分析源码,如果我们没有指定超时释放时间默认是一个看门狗时间(30s) Redis 分布式锁 —— Redisson
  • future 完成后,如果剩余有效期等于 null(获取锁成功),会调用 scheduleExpirationRenewal(自动更新续期) 方法 Redis 分布式锁 —— Redisson
  • 跟踪进来,首先会 new 一个 ExpirationEntry,然后把它扔到了 map 里面,这个 map 的 key 是 string 类型(id + 锁名称),值是 entry,且为 static final,即 RedissonLock 类的所有实例都可以看到这个 map,一个 Lock 类会创建出很多锁的实例,每一个锁的实例都有自己的名字(entryName),在 map 中有唯一的 key 和 唯一的 entry。第一次调用时,entry 不存在,所以使用 putIfAbsent;多次调用时,entry 是存在的,putIfAbsent 就会失效,返回旧的 entry,因此就能够保证不管锁被重入几次,拿到的永远是同一个 entry。所以,这里的 map 的作用就是保证 同一个锁拿到的永远是同一个 entry Redis 分布式锁 —— Redisson
  • 然后将线程ID放入 entry,第一次调用时还会执行 renewExpiration (更新有效期)方法,我们可以跟踪到这个方法里面看一看 Redis 分布式锁 —— Redisson
  • 方法进来后,先从 map 中得到 entry,然后会执行一个定时任务(Timeout),这个任务有两个参数:一个是任务本身 task,另一个参数是延时 delay,即此任务是在 delay 时间到期后执行。我们可以看到这个延时任务会在内部锁施放时间(默认看门狗时间)的 1/3 后执行 Redis 分布式锁 —— Redisson
  • 那么我们可以跟踪到 renewExpirationAsync 方法看一下执行的任务是什么,显然这是一个更新有效期的操作 Redis 分布式锁 —— Redisson
  • 我们再返回去,可以看到 renewExpirationAsync 方法执行完后又会递归调用自身,这样一来,锁的有效期就会不断进行重置,永不过期(初始默认为30s,10s后又设置为30s ....) Redis 分布式锁 —— Redisson
  • 最后把任务放到 entry 中,因此 entry 中存放了两个数据:一个是当前线程ID,一个是定时任务。从这里我们就能看出为啥前面第一次调用时会执行 renewExpiration,而后面就不会调用此方法,因为 oldEntry 中已经有了定时任务,只需要把线程ID加进去即可 Redis 分布式锁 —— Redisson
  • 由上面分析可以看出锁的有效期被无限延续,那什么时候释放锁呢?自然是在 unlock 的时候。我们继续跟踪 unlock 方法,在释放锁的时候会执行 cancelExpirationRenewal(取消更新任务) 方法 Redis 分布式锁 —— Redisson
  • 我们跟踪到 cancelExpirationRenewal 方法中,根据锁的名称从 map 中取出当前锁的 entry,将 ID 移除掉,然后再取出 task,将任务取消掉,最后再把 entry 移除掉,锁的释放就彻底完成了 Redis 分布式锁 —— Redisson
  • 总的来讲就是一个看门狗,没过一段时间就会更新锁的有效期

Redis 分布式锁 —— Redisson

总结

  • 可重入:利用 hash 结构记录线程 id 和重入次数
  • 可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用 watchDog,每隔一段时间(releaseTime / 3),重置超时时间
转载自:https://juejin.cn/post/7135307906031091749
评论
请登录