likes
comments
collection
share

水煮Redisson(十八)-联合锁和红锁

作者站长头像
站长
· 阅读数 22

前言

联合锁和红锁放在一起来分析,是因为这两种锁实现在Redisson中,关联密切。

    /**
     * Returns MultiLock instance associated with specified <code>locks</code>
     * 
     * @param locks - collection of locks
     * @return MultiLock object
     */
    RLock getMultiLock(RLock... locks);
    
    /*
     * Use getLock method instead. Returned instance uses Redis Slave synchronization
     */
    @Deprecated
    RLock getRedLock(RLock... locks);

红锁

上面getRedLock方法上有@Deprecated注解,说明在Redisson实现中,不推荐使用红锁。

简介

红锁是官方提出的一个锁方案,大概如下: 假设redis集群有5个节点,一个客户端申请锁时,会向所有的节点申请锁,只有大于一半的节点数量返回成功,且耗费的总时间小于锁有效时间,才会判定为拿锁成功,否则会被认定为失败,需要将已经成功的节点锁释放掉。 具体官方文档见:redis.io/topics/dist…

实现

RedissonRedLock继承了联合锁【RedissonMultiLock】,也没有重写核心方法。

/**
 * RedLock locking algorithm implementation for multiple locks. 
 * It manages all locks as one.
 * 
 * @see <a href="http://redis.io/topics/distlock">http://redis.io/topics/distlock</a>
 *
 * @author Nikita Koksharov
 *
 */
public class RedissonRedLock extends RedissonMultiLock {

    /**
     * Creates instance with multiple {@link RLock} objects.
     * Each RLock object could be created by own Redisson instance.
     *
     * @param locks - array of locks
     */
    public RedissonRedLock(RLock... locks) {
        super(locks);
    }
    @Override
    protected int failedLocksLimit() {
        return locks.size() - minLocksAmount(locks);
    }
    protected int minLocksAmount(final List<RLock> locks) {
        return locks.size()/2 + 1;
    }
    @Override
    protected long calcLockWaitTime(long remainTime) {
        return Math.max(remainTime / locks.size(), 1);
    } 
    @Override
    public void unlock() {
        unlockInner(locks);
    }
}

联合锁

Groups multiple independent locks and manages them as one lock. 是对一个锁的集合进行操作,与红锁不同的是,集合中所有的锁,要么全部拿锁成功,要么失败,不存在部分成功。实现类:RedissonMultiLock

获取锁

不限等待时间获取 lock

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long baseWaitTime = locks.size() * 1500;
        long waitTime = -1;
        if (leaseTime == -1) {
            waitTime = baseWaitTime;
        } else {
            leaseTime = unit.toMillis(leaseTime);
            waitTime = leaseTime;
            if (waitTime <= 2000) {
                waitTime = 2000;
            } else if (waitTime <= baseWaitTime) {
                waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
            } else {
                waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
            }
        }
        
        while (true) {
            if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
                return;
            }
        }
    }

tryLock

限定等待时间获取

同步获取

同步获取的代码较长,这里仅截取关键代码。代表方法:tryLock(),调用后,等待锁获取成功或者失败之后,再执行后续逻辑。 下面是后续可能的实现逻辑,直接调用异步获取,然后阻塞等待结果。

try {
    return tryLockAsync(waitTime, leaseTime, unit).get();
} catch (ExecutionException e) {
    throw new IllegalStateException(e);
}

异步获取

异步获取的代码更长,同样只截取关键代码

    @Override
    public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RPromise<Boolean> result = new RedissonPromise<Boolean>();
		// 这里创建锁状态,这个对象内,进行锁申请
        LockState state = new LockState(waitTime, leaseTime, unit, threadId);
        state.tryAcquireLockAsync(locks.listIterator(), result);
        return result;
    }

异步获取主要是通过内部类【LockState】来实现

void tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) {
    RLock lock = iterator.next();
    RPromise<Boolean> lockAcquiredFuture = new RedissonPromise<Boolean>();
    //
    lock.tryLockAsync(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS, threadId)
            .onComplete(new TransferListener<Boolean>(lockAcquiredFuture));
    
    lockAcquiredFuture.onComplete((res, e) -> {
        boolean lockAcquired = res;
        
        if (lockAcquired) {
            acquiredLocks.add(lock);
        } else {
            if (failedLocksLimit == 0) {
                unlockInnerAsync(acquiredLocks, threadId).onComplete((r, ex) -> {
                    failedLocksLimit = failedLocksLimit();
                    acquiredLocks.clear();
                    // reset iterator
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                    
                    checkRemainTimeAsync(iterator, result);
                });
                return;
            } else {
                failedLocksLimit--;
            }
        }
        // 如果持有时间不限制,则直接再次调用此方法,操作下一个锁;否则判断持有时间是否已经耗尽,如果时间耗尽,则解锁,并返回拿锁失败
        checkRemainTimeAsync(iterator, result);
    });
}

执行步骤:

  1. 拿到集合中的下一个锁,尝试获取锁;
  2. 判断是否获取成功,如果成功,则将此锁添加到已成功的集合中,然后执行checkRemainTimeAsync方法,此方法的功能:如果持有时间不限制,则直接再次调用此方法,操作下一个锁;否则判断持有时间是否已经耗尽,如果时间耗尽,则解锁,并返回拿锁失败;
  3. 如果获取锁失败,则判断失败重试次数是否为零,如果不为零,则继续往后执行,操作下一个锁;
  4. 如果失败重试次数为零【failedLocksLimit】,则释放掉所有已持有的锁,成功后,调用checkRemainTimeAsync方法,再次尝试获取。

释放锁

联合锁不支持强制释放

同步无返回

联合锁释放时,阻塞执行,直到释放完成以后,才返回。

    @Override
    public void unlock() {
        List<RFuture<Void>> futures = new ArrayList<>(locks.size());
        for (RLock lock : locks) {
			// 拿到每个锁释放的Future结果
            futures.add(lock.unlockAsync());
        }
        for (RFuture<Void> future : futures) {
			// 遍历等待每个锁释放完成
            future.syncUninterruptibly();
        }
    }

异步有返回

将参数中的锁逐个释放,直接返回一个Future。

    protected RFuture<Void> unlockInnerAsync(Collection<RLock> locks, long threadId) {
        if (locks.isEmpty()) {
            return RedissonPromise.newSucceededFuture(null);
        }
        RPromise<Void> result = new RedissonPromise<Void>();
        AtomicInteger counter = new AtomicInteger(locks.size());
        for (RLock lock : locks) {
            lock.unlockAsync(threadId).onComplete((res, e) -> {
				// 如果释放锁的过程中,发生异常,则直接返回错误。
                if (e != null) {
                    result.tryFailure(e);
                    return;
                }
                // 当锁全部释放之后,返回成功
                if (counter.decrementAndGet() == 0) {
                    result.trySuccess(null);
                }
            });
        }
        return result;
    }

强制释放

联合锁不支持强制释放,重写的方法内直接返回异常

    @Override
    public boolean forceUnlock() {
        throw new UnsupportedOperationException();
    }