【Redisson】MultiLock 源码剖析
Redisson
分布式锁支持 MultiLock
机制:可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁。
一次性锁定多个资源,然后去处理逻辑,最后一次性释放所有的资源对应的锁。
实际场景中:锁库存、锁订单、锁积分。
这个可能在数据库里操作,用行锁。
举个栗子:
@Test
public void test() {
RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RLock lock3 = redisson.getLock("lock3");
RLock multiLock = redisson.getMultiLock(lock1, lock2, lock3);
multiLock.lock();
multiLock.unlock();
}
先来看获取锁:RLock multiLock = redisson.getMultiLock(lock1, lock2, lock3);
实际上:就是拿个一个数组来存放这些锁。
// RedissonMultiLock.java
// 存放到一个列表中
final List<RLock> locks = new ArrayList<>();
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
}
this.locks.addAll(Arrays.asList(locks));
}
(1)加锁
// RedissonMultiLock.java
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 1. 计算等待时间
// 等待时间 = 锁的数量 * 1500, 3个锁则 4500
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
if (leaseTime == -1) {
waitTime = baseWaitTime;
} else { // 自定义等待时间
leaseTime = unit.toMillis(leaseTime);
waitTime = leaseTime;
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}
// 2. 等待全部加锁成功
while (true) {
if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
return;
}
}
}
此过程,主要分为两部分:
- 计算等待时间:跟锁数量相关; 3个锁,等待时间 = 3 * 1500 = 4500
- 尝试加全部锁:无限循环,至全部锁加成功
进入 tryLock()
,看其源码:
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
throws InterruptedException {
// 1. 遍历给每个锁尝试加锁
// 只要有一个锁加锁失败,则把之前的解锁,之后重新加锁
// 一堆逻辑,不看也罢
{...}
// 2. 锁有时效,异步过期检查
{...}
return true;
}
获取所有的锁必须在多长时间之内就要结束,如果超时就会重新再次死循环尝试获取锁。
使用的是各个锁的tryLock()
方法,指定了说在获取每个单独的锁的时候,会有一个获取超时退出的时间。
(2)释放锁
@Override
public void unlock() {
List<RFuture<Void>> futures = new ArrayList<>(locks.size());
// 异步释放锁
// 锁的释放,针对具体锁,这里就不展开。
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
// 等待所有锁释放
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
}
转载自:https://juejin.cn/post/7112015322579730462