likes
comments
collection
share

RedissonLock加锁与解锁的实现原理

作者站长头像
站长
· 阅读数 18

RedissonLock

什么是 RedissonLock

RedissonLock 是一个基于 Redis 的分布式锁实现。它提供了一个简单而强大的接口,帮助开发人员在分布式环境下实现锁定机制。通过 RedissonLock,开发人员可以轻松地控制并发访问,确保数据的一致性和可靠性。

RedissonLock 的设计灵活且易于使用,可以根据需求设置锁的超时时间和自动续期机制。同时,RedissonLock 也支持公平锁和非公平锁,满足不同场景下的需求。

总的来说,RedissonLock 是一个可靠的分布式锁实现,为开发人员提供了方便快捷的解决方案,帮助他们构建稳定可靠的分布式系统。

如何使用

导入依赖

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.27.2</version>
</dependency>

示例

// 配置你的 Redis 服务器地址
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");

// 创建 Redisson 客户端实例
RedissonClient redisson = Redisson.create(config);

// 定义锁的 key
String lockKey = "anyLockKey";

// 获取锁对象
RLock lock = redisson.getLock(lockKey);

try {
    // 尝试加锁,最多等待 100 秒,上锁以后 10 秒自动解锁
    boolean isLock = lock.tryLock(100, 10, TimeUnit.SECONDS);
    if (isLock) {
        try {
            // 业务处理...
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    // 关闭 Redisson 客户端连接
    redisson.shutdown();
}

上面的示例使用了 tryLock 方法来尝试获取锁,这个方法可以避免死锁。该方法接收三个参数:等待获取锁的最大时间、锁自动释放的时间和时间单位。如果成功获取锁,则执行业务逻辑,然后在 finally 块中释放锁。

需要注意的是,上锁以后设置了自动解锁时间,这是为了防止部分场景下未正确释放锁导致其他线程或进程无法获取锁的问题。自动解锁时间应该根据业务执行时间的预估合理设置。

源码解读

加锁
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        try {
            subscribeFuture.get(time, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
                    "Unable to acquire subscription lock after " + time + "ms. " +
                            "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
                subscribeFuture.whenComplete((res, ex) -> {
                    if (ex == null) {
                        unsubscribe(res, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        } catch (ExecutionException e) {
            LOGGER.error(e.getMessage(), e);
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
  1. tryAcquire 方法:
    • 这个方法尝试去获取锁,如果锁已经被某个线程获取了,那么会返回一个ttl(Time To Live,即该锁的剩余存活时间);如果获取锁成功,在Redis中设置锁的过期时间,并且会返回 null。
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
	return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId));
}

private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
	return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

/*
* 该方法的主要流程可解释如下:
* 根据leaseTime是否大于0来选择使用的内部方法进行尝试获取锁。
* 如果leaseTime大于0,则使用指定的leaseTime调用tryLockInnerAsync方法。
* 若leaseTime不大于0,则调用tryLockInnerAsync方法,并使用internalLockLeaseTime作为默认的续约时间。
* 调用handleNoSync方法来处理ttlRemainingFuture,即锁的剩余时间。
* 使用CompletionStage的thenApply方法来处理获取锁的结果。
* 如果获取到锁(ttlRemaining为空),则根据leaseTime设置internalLockLeaseTime或调用scheduleExpirationRenewal方法进行锁的续约计划。
* 最后返回一个CompletableFutureWrapper,它包装了CompletionStage的结果
*/
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
	RFuture<Long> ttlRemainingFuture;
	if (leaseTime > 0) {
		ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
	} else {
		ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
											   TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
	}
	CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
	ttlRemainingFuture = new CompletableFutureWrapper<>(s);

	CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
		// lock acquired
		if (ttlRemaining == null) {
			if (leaseTime > 0) {
				internalLockLeaseTime = unit.toMillis(leaseTime);
			} else {
				scheduleExpirationRenewal(threadId);
			}
		}
		return ttlRemaining;
	});
	return new CompletableFutureWrapper<>(f);
}

/*
* 检查键KEYS[1]是否存在或者hash中是否存在与ARGV[2](线程ID转换为锁名称)相关的字段。
* 如果条件满足,将对应的hash值增加1(重入次数),并刷新这个键的过期时间(pexpire)为租约时间。
* 如果条件不满足(即已有其他线程持有锁),Lua脚本返回这个键的剩余时间(pttl)。
* 总结:这段代码是一个分布式锁的实现细节,它使用Lua脚本在Redis中原子性地获取锁,
* 并且支持锁重入的概念。如果一个线程再次获取到同一个锁,那么它的锁的持有次数会增加,
* 同时刷新其过期时间,确保其他线程无法获取此锁,
* 直到持有线程主动释放锁或达到过期时间。如果无法获取锁,则返回剩余的过期时间。
**/
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
	return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, command,
								"if ((redis.call('exists', KEYS[1]) == 0) " +
								"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
								"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
								"redis.call('pexpire', KEYS[1], ARGV[1]); " +
								"return nil; " +
								"end; " +
								"return redis.call('pttl', KEYS[1]);",
								Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
  1. subscribe 方法:
    • 如果初始的加锁尝试失败(即 tryAcquire 返回了一个非 null 的值),表示锁目前被占用,线程则需要等待。在这种情况下,该线程会调用 subscribe 方法进行订阅操作,准备接收锁释放的通知。
  1. pubSub.timeout(future):
    • 这行代码可能设置了未来任务 future 与发布订阅机制相关的超时时间。
  1. interruptibly 参数:
    • 参数 interruptibly 指出线程在等待锁时是否可以响应中断。线程被中断会抛出一个 InterruptedException。
  1. entry.getLatch().tryAcquire 和 entry.getLatch().acquire 方法:
    • 这些方法使用了计数锁存器(CountDownLatch),让线程等待锁的释放。如果锁未在指定时间内被释放,线程将继续等待或者尝试再次获取锁。
    • 如果线程被设置为可中断,那么在等待期间线程可以被中断,由 InterruptedException 来处理中断逻辑。
  1. unsubscribe 方法:
    • 这是清理或结束订阅的环节,当不再需要等待锁释放的通知时或者当锁被获取后需要取消订阅。

整个方法通过一个while循环来不断尝试获取锁,并设置适当的等待机制,直到锁被成功获取。如果线程获取锁后,会退出while循环,并执行相应的业务逻辑。如果在获取锁的过程中线程被中断且 interruptibly 设置为 true,则会响应中断并抛出异常。

解锁
private RFuture<Void> unlockAsync0(long threadId) {
CompletionStage<Boolean> future = this.unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((res, e) -> {
	this.cancelExpirationRenewal(threadId, res);
	if (e != null) {
		if (e instanceof CompletionException) {
			throw (CompletionException)e;
		} else {
			throw new CompletionException(e);
		}
	} else if (res == null) {
		IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
		throw new CompletionException(cause);
	} else {
		return null;
	}
});
return new CompletableFutureWrapper(f);
}
  1. unlockInnerAsync(threadId):调用此方法以开始解锁过程,该方法返回一个CompletionStage类型的 future 对象,这表明操作完成后能获得一个布尔结果。
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {
String id = this.getServiceManager().generateId();
MasterSlaveServersConfig config = this.getServiceManager().getConfig();
int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts();
timeout = Math.max(timeout, 1);
RFuture<Boolean> r = this.unlockInnerAsync(threadId, id, timeout);
CompletionStage<Boolean> ff = r.thenApply((v) -> {
	CommandAsyncExecutor ce = this.commandExecutor;
	if (ce instanceof CommandBatchService) {
		ce = new CommandBatchService(this.commandExecutor);
	}

	((CommandAsyncExecutor)ce).writeAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, new Object[]{this.getUnlockLatchName(id)});
	if (ce instanceof CommandBatchService) {
		((CommandBatchService)ce).executeAsync();
	}

	return v;
});
return new CompletableFutureWrapper(ff);
}

/*
* 尝试获取KEYS[3]对应的值(即getUnlockLatchName(requestId)的结果),
* 如果存在则返回这个值并转换成数字。
* 检查哈希表KEYS[1](锁的名称)中是否存在由ARGV[3]指定的字段(线程ID转换为锁名称),
* 如果不存在表示当前线程并没有持有锁,返回nil。
* 若字段存在,通过HINCRBY将该字段值减1,这个操作相当于尝试释放一次重入锁。
* 判断计数器的值:
* 若大于0:说明当前线程仍然持有锁(因为可能是重入),所以执行PEXPIRE为锁刷新过期时间,
* 并用SET命令将KEYS[3]设为0(表示仍然锁定),最后返回0。
* 若等于0或小于0:说明锁已全部释放,执行DEL命令删除KEYS[1]
* 并使用ARGV[4]对KEYS[2]执行发布操作(可能是发布解锁消息到对应的频道),
* 然后用SET命令将KEYS[3]设为1(表示已解锁),最后返回1。
* 这个方法接收以下参数:
* long threadId:尝试解锁的线程ID。
* String requestId:此解锁请求的唯一标识。
* int timeout:解锁操作的超时时间。
* evalWriteSyncedAsync 函数负责异步执行这段LUA脚本,期间涉及到的参数包括:
* getRawName():可能会返回锁的key名称。
* getChannelName():可能返回解锁时要发布消息的频道名称。
* getUnlockLatchName(requestId):返回解锁周期的名称,用于控制解锁的状态。
* LockPubSub.UNLOCK_MESSAGE:在解锁时发送的消息文本。
* 8internalLockLeaseTime:内部锁租约时间。
* getLockName(threadId):将线程ID转换为锁名称。
* getSubscribeService().getPublishCommand():获取发布命令的方法。
*/
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
	return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
								"local val = redis.call('get', KEYS[3]); " +
								"if val ~= false then " +
								"return tonumber(val);" +
								"end; " +

								"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
								"return nil;" +
								"end; " +
								"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
								"if (counter > 0) then " +
								"redis.call('pexpire', KEYS[1], ARGV[2]); " +
								"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
								"return 0; " +
								"else " +
								"redis.call('del', KEYS[1]); " +
								"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
								"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
								"return 1; " +
								"end; ",
								Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
								LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
								getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}
  1. future.handle((res, e) -> {...}):处理 future 的结果,它会返回一个新的 CompletionStage 对象。有两种情况:
    • 操作成功(res 非空,e 为空):解锁成功,什么都不做并正常结束。
    • 操作失败(e 非空):处理异常。如果异常是 CompletionException 的实例,则直接抛出;否则,用原始异常作为原因创建并抛出一个新的 CompletionException。
  1. 同时,如果得到的结果是 null,这意味着有问题,因为预期是获得一个布尔值。在这种情况下,抛出一个 IllegalMonitorStateException 异常,含义是当前线程没有持有该锁但试图解锁。
  2. new CompletableFutureWrapper(f):将 CompletionStage 对象包装成 RFuture 对象,这可能是为了与库的其他部分接口保持一致。这个 RFuture 类型的返回值表示解锁过程的异步结果。
转载自:https://juejin.cn/post/7352075719152156710
评论
请登录