likes
comments
collection
share

基于Redission高级应用13-RLock原理及工具类封装及实战应用

作者站长头像
站长
· 阅读数 34

概述:

RLock 是 Redisson 提供的一个基于 Redis 的分布式可重入锁实现。在分布式系统中,多个进程可能会尝试同时访问同一个资源,RLock 提供了一种机制来确保同一时间只有一个进程可以访问该资源。

RLock的原理:

  1. 可重入性: RLock 是可重入的,即同一个线程可以多次获得同一把锁而不会发生死锁。
  2. 基于Redis的锁: RLock 使用 Redis 的特性来实现分布式锁的功能。可以使用 Redis 的 SET 命令(带有 NX 和 PX 参数)来实现锁的基本操作。
  3. 锁的自动续期: Redisson 会启动一个看门狗任务,定期检查持有锁的线程是否仍然活跃,如果活跃则自动续期,防止在业务处理过程中锁被意外释放。
  4. UUID + 线程ID: RLock 锁会关联一个 UUID 和线程ID,以此来标识锁的所有者,这样即使是在不同的进程中,只要 UUID 和线程ID 相同,也能实现可重入。
  5. 锁的获取和释放: 当一个线程请求锁时,如果锁可用(即不存在或已过期),它就会设置一个带过期时间的键。如果锁被其他线程持有,当前线程会进入等待状态直到锁被释放或超时。释放锁时,如果锁的计数器减到 0,则删除 Redis 中的键。

RLock的优点:

  1. 跨进程同步: RLock 允许不同进程之间的线程安全地同步对共享资源的访问。
  2. 高可用性: 通过 Redis 实现,RLock 可以利用 Redis 的高可用性和持久性。
  3. 可重入: 同一线程可以多次获取锁而不会被阻塞。
  4. 自动续期: 防止长时间操作导致锁过期的问题。
  5. 防止死锁: 锁有过期时间,即使出现异常,锁也会最终释放。

RLock的缺点:

  1. 性能开销: 由于依赖于网络和 Redis 操作,RLock 在获取和释放锁时存在网络延迟和性能开销。
  2. Redis依赖: RLock 的正常工作依赖于 Redis 服务的稳定性。如果 Redis 服务不可用,锁机制也会受到影响。
  3. 时钟同步: 分布式系统中的不同节点需要有较为一致的时钟,否则可能会影响锁的过期机制。
  4. 不支持公平锁: Redisson 的 RLock 是非公平锁,无法保证请求锁的线程按照请求的顺序获得锁。

核心代码讲解:

org.redisson.RedissonLock:-> tryLock

基于Redission高级应用13-RLock原理及工具类封装及实战应用

流程图:

成功
失败
时间耗尽
时间剩余
订阅成功
订阅失败
收到事件
等待超时
成功
失败
时间耗尽
时间剩余
开始
转换等待时间和租约时间为毫秒
尝试获取锁
锁获取成功
检查剩余时间
结束
获取锁失败
订阅锁释放事件
等待锁释放事件
获取锁失败
再次尝试获取锁
检查剩余时间
获取锁失败
取消订阅
  • 开始流程,并转换输入的时间参数。
  • 尝试获取锁。
  • 如果获取锁成功,则流程结束。
  • 如果获取锁失败,检查是否还有剩余的等待时间。
  • 如果时间耗尽,标记获取锁失败,并取消订阅(如果已经订阅),然后结束流程。
  • 如果还有剩余时间,订阅锁释放事件。
  • 如果订阅成功,等待锁释放事件。
  • 如果在等待过程中收到锁释放事件,再次尝试获取锁。
  • 如果在等待过程中等待超时,再次检查剩余时间。
  • 如果再次尝试获取锁失败,或者再次检查时间后时间耗尽,标记获取锁失败,并取消订阅,然后结束流程。

时序图:

tryLock 方法的执行过程:

ClientLockRedisalt[event receivedLock->>Redis:tryAcquire()][timeout]loop[until waitTime expires]alt[lock acquired][lock not acquired]tryLock(waitTime, leaseTime)tryAcquire()Lock availablereturn truetryAcquire()Lock not availablewait for lock releasewait for event or timeoutLock availablereturn truereturn falseClientLockRedis

核心逻辑是通过轮询和订阅机制来等待锁的释放,同时确保不超过指定的等待时间。通过设置租约时间,它还确保了即使在持有锁的进程异常终止的情况下,锁也会在一定时间后自动释放,防止死锁的发生。

基于RLock封装的工具类:AdvancedRLockUtils

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
 * @Author derek_smart
 * @Date 202/5/9 12:41
 * @Description RLock工具类
 */
public class AdvancedRLockUtils {

    private static final Logger logger = LoggerFactory.getLogger(AdvancedRLockUtils.class);
    @Autowired
    private RedissonClient redissonClient;

    // 尝试获取锁并执行操作,如果获取锁失败则返回默认值
    public <T> CompletableFuture<T> tryLockAndExecuteAsync(String lockKey, long waitTime, long leaseTime, TimeUnit unit, Supplier<T> operation, T defaultValue) {
        RLock lock = redissonClient.getLock(lockKey);
        return CompletableFuture.supplyAsync(() -> {
            boolean lockAcquired = false;
            try {
                lockAcquired = lock.tryLock(waitTime, leaseTime, unit);
                if (lockAcquired) {
                    return operation.get();
                } else {
                    logger.warn("Failed to acquire lock for key: {}", lockKey);
                }
            } catch (InterruptedException e) {
                logger.error("Lock acquisition interrupted for key: {}", lockKey, e);
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                logger.error("Operation execution failed for key: {}", lockKey, e);
            } finally {
                if (lockAcquired) {
                    safeUnlock(lock, lockKey);
                }
            }
            return defaultValue;
        });
    }

    // 尝试获取锁并执行操作,如果获取锁失败则返回默认值
    public <T> T tryLockAndExecute(String lockKey, long waitTime, long leaseTime, TimeUnit unit, Supplier<T> operation, T defaultValue) {
        RLock lock = redissonClient.getLock(lockKey);
        boolean lockAcquired = false;
        try {
            lockAcquired = lock.tryLock(waitTime, leaseTime, unit);
            if (lockAcquired) {
                return operation.get();
            } else {
                logger.warn("Failed to acquire lock for key: {}", lockKey);
            }
        } catch (InterruptedException e) {
            logger.error("Lock acquisitionnterrupted for key: {}", lockKey, e);
            Thread.currentThread().interrupt();
        } catch (Exception e) {
            logger.error("Operation execution failed for key: {}", lockKey, e);
        } finally {
            if (lockAcquired) {
                lock.unlock();
            }
        }
        return defaultValue;
    }

    // 获取锁并执行操作,如果获取锁失败则抛出异常
    public <T> T lockAndExecute(String lockKey, long leaseTime, TimeUnit unit, Supplier<T> operation) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(leaseTime, unit);
        try {
            return operation.get();
        } catch (Exception e) {
            logger.error("Operation execution failed for key: {}", lockKey, e);
            throw e;
        } finally {
            lock.unlock();
        }
    }

    // 使用默认租约时间获取锁并执行操作
    public <T> T lockAndExecute(String lockKey, Supplier<T> operation) {
        return lockAndExecute(lockKey, 30, TimeUnit.SECONDS, operation);
    }

    // ... 可以添加更多的方法来支持不同的锁定策略 ...

    // 释放锁的安全方法
    private void safeUnlock(RLock lock, String lockKey) {
        try {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        } catch (Exception e) {
            logger.error("Failed to release lock for key: {}", lockKey, e);
        }
    }
}
  • 添加了 logger 用于记录锁操作和操作执行过程中的重要信息和异常。
  • tryLockAndExecute 方法中,添加了对中断的处理,并在获取锁失败时返回默认值。
  • tryLockAndExecutelockAndExecute 方法中,添加了对操作执行异常的捕获和记录。
  • 添加了 safeUnlock 方法,这是一个私有辅助方法,用于安全地释放锁,确保即使在解锁过程中发生异常,也不会影响应用程序的其他部分。
  • 使用 CompletableFuture: 用于支持异步编程模式,在多线程环境中提供非阻塞的锁操作。

基于Redission高级应用13-RLock原理及工具类封装及实战应用

AdvancedRLockUtils 类的设计:

AdvancedRLockUtils
-RedissonClient redissonClient
-Logger logger
+tryLockAndExecuteAsync(String lockKey, long waitTime, long leaseTime, TimeUnit unit, Supplier<T> operation, T defaultValue) : CompletableFuture<T>
+tryLockAndExecute(String lockKey, long waitTime, long leaseTime, TimeUnit unit, Supplier<T> operation, T defaultValue) : T
+lockAndExecute(String lockKey, long leaseTime, TimeUnit unit, Supplier<T> operation) : T
+lockAndExecute(String lockKey, Supplier<T> operation) : T
-safeUnlock(RLock lock, String lockKey) : void

类图讲解:

  • AdvancedRLockUtils 是主要的类,包含了锁定策略的方法和一个私有的解锁方法。
  • -RedissonClient redissonClient 表示一个私有的 Redisson 客户端实例,用于与 Redis 交互。
  • -Logger logger 表示一个私有的日志记录器实例,用于记录日志。
  • +tryLockAndExecuteAsync 是一个公共方法,它尝试异步获取锁并执行操作,返回一个 CompletableFuture
  • +tryLockAndExecute 是一个公共方法,它尝试同步获取锁并执行操作。
  • +lockAndExecute 是一个公共方法,它阻塞地获取锁并执行操作。
  • +lockAndExecute (重载版本) 是一个公共方法,使用默认租约时间获取锁并执行操作。
  • -safeUnlock 是一个私有方法,用于安全地释放锁。

使用 AdvancedRLockUtils 类示例:

示例 1: 同步执行操作,阻塞直到获取锁

AdvancedRLockUtils lockUtils = new AdvancedRLockUtils();

// 定义一个操作,例如更新数据库记录
Supplier<Boolean> updateOperation = () -> {
    // 执行数据库更新操作
    // ...
    return true; // 返回操作成功
};

// 使用锁来确保操作的原子性
String lockKey = "myLockKey"; // 锁的唯一标识符
boolean result = lockUtils.lockAndExecute(lockKey, updateOperation);

// 检查操作结果
if (result) {
    System.out.println("Operation executed successfully with lock.");
} else {
    System.out.println("Operation failed.");
}

示例 2: 尝试获取锁,并在失败时返回默认值

Supplier<Boolean> criticalOperation = () -> {
    // 执行关键操作
    // ...
    return true; // 返回操作成功
};

String lockKey = "myLockKey";
long waitTime = 10; // 等待锁的最大时间(例如10秒)
long leaseTime = 60; // 锁的租约时间(例如60秒)
TimeUnit unit = TimeUnit.SECONDS; // 时间单位
Boolean defaultValue = false; // 如果无法获取锁,返回默认值

// 尝试获取锁并执行操作
Boolean result = lockUtils.tryLockAndExecute(lockKey, waitTime, leaseTime, unit, criticalOperation, defaultValue);

// 检查操作结果
if (result) {
    System.out.println("Operation executed successfully with lock.");
} else {
    System.out.println("Failed to acquire lock, operation skipped or failed.");
}

示例 3: 异步执行操作,非阻塞获取锁

Supplier<String> asyncOperation = () -> {
    // 执行某些异步操作
    // ...
    return "Success"; // 返回操作结果
};

String lockKey = "myLockKey";
long waitTime = 5; // 等待锁的最大时间(例如5秒)
long leaseTime = 30; // 锁的租约时间(例如30秒)
TimeUnit unit = TimeUnit.SECONDS; // 时间单位
String defaultValue = "Default Value"; // 如果无法获取锁,返回默认值

// 异步获取锁并执行操作
CompletableFuture<String> futureResult = lockUtils.tryLockAndExecuteAsync(lockKey, waitTime, leaseTime, unit, asyncOperation, defaultValue);

// 处理异步结果
futureResult.thenAccept(result -> {
    if ("Success".equals(result)) {
        System.out.println("Async operation executed successfully with lock.");
    } else {
        System.out.println("Failed to acquire lock, async operation skipped or failed.");
    }
});

在上述示例中,演示了如何使用 AdvancedRLockUtils 类来同步或异步执行操作,并根据是否能获取到锁来处理结果。这些示例可以根据你的实际业务逻辑进行调整和扩展。请确保正确处理任何可能的异常,并在不需要锁时释放它以避免死锁。

使用AdvancedRLockUtils概述:

AdvancedRLockUtils 是一个基于 Redisson 的工具类,它提供了一组简化的方法来处理分布式锁的获取和释放。这个工具类的设计旨在使得在分布式环境中实现同步操作变得更加容易和安全。以下是对 AdvancedRLockUtils 使用的总结和建议

总结

1. 封装AdvancedRLockUtils 封装了 Redisson 锁的获取和释放逻辑,使得客户端代码更加简洁和易于理解。 2. 灵活性: 它提供了同步和异步操作的支持,以及尝试获取锁的方法,这些方法允许指定等待时间和租约时间。 3. 安全性: 工具类中的 safeUnlock 方法确保在发生异常时锁能够被正确释放,减少了死锁的风险。 4. 日志记录: 通过记录日志,可以帮助开发者在出现问题时进行调试和跟踪。

建议

1. 异常处理: 确保所有的操作都有适当的异常处理逻辑,以便在操作失败时能够做出相应的响应。 2. 超时与租约: 在使用锁时,合理设置等待时间和租约时间,以避免长时间占用锁或者在操作未完成时锁被释放。 3. 锁的粒度: 使用锁时要注意粒度问题,避免过大的粒度导致系统性能下降,也要避免过小的粒度导致复杂性增加。 4. 资源清理: 在操作完成后,确保所有的资源都被正确清理,包括锁的释放。 5. 锁策略: 根据实际场景选择合适的锁策略,比如读写锁、公平锁等。 6. 测试: 在生产环境部署前,进行充分的测试,确保锁的逻辑在高并发和各种异常情况下都能正确工作。 7. 监控: 在生产环境中监控锁的使用情况,以便及时发现潜在的死锁或性能问题。

总的来说,AdvancedRLockUtils 是一个有用的工具类,它简化了分布式锁的使用。然而,使用分布式锁时,应该继续保持谨慎,确保锁的使用不会成为系统性能瓶颈或引入新的复杂性。

最终总结:

在使用 RLock 时,应该注意确保业务逻辑处理的时间不会超过锁的过期时间,或者确保看门狗任务能够正常工作以自动续期。此外,应当处理好异常情况,确保在任何情况下锁都能被正确释放,避免资源泄露和死锁的发生。

转载自:https://juejin.cn/post/7366889205336637440
评论
请登录