redisson分布式锁实现原理
一、基于redisson实现分布式锁使用
Redisson是一个使用Java编写的开源库,它提供了对Redis数据库的访问和操作的封装,并在此基础上提供了各种分布式功能,包括分布式锁。 Redisson的分布式锁是基于Redis的原子性操作来实现的,它提供了简单且易于使用的API,可以在分布式环境中实现高效的分布式锁管理。
1.引入依赖
引入redis和redisson相关依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
2.编写配置
编写声明RedissonClient配置,server类型可以选ClusterServers,MasterSlaveServers,ReplicatedServers,SentinelServers和SingleServer,此处使用的server类型选单体redis:
@Configuration
public class RedissonConfiguration {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password:}")
private String password;
private int database = 0;
@Bean
public RedissonClient redisson() {
String address = "redis://" + host + ":" + port;
Config config = new Config();
config.useSingleServer()
.setAddress(address)
.setPassword(password)
.setDatabase(database);
return Redisson.create(config);
}
}
3.使用分布式锁
注入RedissonClient然后获取锁,加锁后进行独占业务操作,最后释放锁。
@Service
@Slf4j
public class TestRLock {
@Resource
private RedissonClient redissonClient;
public void doSomething(String orderId) {
RLock lock = redissonClient.getLock("place_order:" + orderId);
try {
if(lock.tryLock(5,10, TimeUnit.SECONDS)) {
this.doBuzzExclusive(orderId);
}
} catch (Exception e) {
log.error("occur error;orderId={}",orderId,e);
} finally {
//锁被持有,并且被当前线程持有
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
/**
* 模拟独占处理
* @param orderId
*/
private void doBuzzExclusive(String orderId) {
// TODO: do business exclusive
}
}
这样就实现分布式锁对竞态资源的操作控制。
二、redisson分布式锁原理
1.建立连接
在Redisson中,Netty被用作底层的网络通信框架。它提供了高性能、异步非阻塞的网络通信能力,使得Redisson可以与Redis服务器进行快速、可靠的通信。 在使用Redisson创建RedissonClient实例时,它会自动初始化并启动Netty客户端,用于与Redis服务器建立连接。 从前边的分布式锁使用过程可以看出,RLock是由RedissonClient创建,那么与redis的连接交互也是由RedissonClient来实现,我们从创建RedissonClient过程看一下redisson如何与redis建立连接的。
public static RedissonClient create(Config config) {
return new Redisson(config);
}
然后调用Redisson构造函数创建:
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(this);
}
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
}
这里会复制一份配置出来,然后创建连接管理器、命令执行器、定期定出调度、以及异步写服务。
此处主要关注命令执行器和连接管理器,此处用的是同步命令执行器,当然也有其他实现比如CommandBatchService批量执行器。
然后再看下创建连接管理器:
public static ConnectionManager createConnectionManager(Config configCopy) {
UUID id = UUID.randomUUID();
if (configCopy.getMasterSlaveServersConfig() != null) {
validate(configCopy.getMasterSlaveServersConfig());
return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
} else if (configCopy.getSingleServerConfig() != null) {
validate(configCopy.getSingleServerConfig());
return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
} else if (configCopy.getSentinelServersConfig() != null) {
validate(configCopy.getSentinelServersConfig());
return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);
} else if (configCopy.getClusterServersConfig() != null) {
validate(configCopy.getClusterServersConfig());
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
} else if (configCopy.getReplicatedServersConfig() != null) {
validate(configCopy.getReplicatedServersConfig());
return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);
} else if (configCopy.getConnectionManager() != null) {
return configCopy.getConnectionManager();
}else {
throw new IllegalArgumentException("server(s) address(es) not defined!");
}
}
前边使用的是SingleServer,看一下SingleConnectionManager创建流程:
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) {
super(create(cfg), config, id);
}
SingleConnectionManager继承了MasterSlaveConnectionManager,会调用父类构造器:
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
this(config, id);
this.config = cfg;
if (cfg.getSlaveAddresses().isEmpty()
&& (cfg.getReadMode() == ReadMode.SLAVE || cfg.getReadMode() == ReadMode.MASTER_SLAVE)) {
throw new IllegalArgumentException("Slaves aren't defined. readMode can't be SLAVE or MASTER_SLAVE");
}
initTimer(cfg);
initSingleEntry();
}
initTimer会创建空闲连接监听管理以及发布订阅管理器,然后调用initSingleEntry初始化单机客户端。
protected void initSingleEntry() {
try {
if (config.checkSkipSlavesInit()) {
masterSlaveEntry = new SingleEntry(this, config);
} else {
masterSlaveEntry = new MasterSlaveEntry(this, config);
}
CompletableFuture<RedisClient> masterFuture = masterSlaveEntry.setupMasterEntry(new RedisURI(config.getMasterAddress()));
masterFuture.join();
//省略...
startDNSMonitoring(masterFuture.getNow(null));
} catch (Exception e) {
//省略...
}
}
创建SingleEntry,然后调用setupMasterEntry方法设置主节点连接,并且会调用startDNSMonitoring方法开启线程监听ip是否发生变成,如果变成会重新连接。继续看setupMasterEntry方法:
public CompletableFuture<RedisClient> setupMasterEntry(RedisURI address, String sslHostname) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
return setupMasterEntry(client);
}
此处会先创建redis客户端,然后调用setupMasterEntry方法设置主节点连接;先看一下创建RedisClient:
private RedisClient(RedisClientConfig config) {
RedisClientConfig copy = new RedisClientConfig(config);
//省略...
channels = new DefaultChannelGroup(copy.getGroup().next());
bootstrap = createBootstrap(copy, Type.PLAIN);
pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
//省略...
}
这里基本上都是netty启动器的相关设置和前置准备,可以看一下创建netty客户端启动器的操作:
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
Bootstrap bootstrap = new Bootstrap()
.resolver(config.getResolverGroup())
.channel(config.getSocketChannelClass())
.group(config.getGroup());
bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
config.getNettyHook().afterBoostrapInitialization(bootstrap);
return bootstrap;
}
然后继续看setupMasterEntry做了什么事情.
private CompletableFuture<RedisClient> setupMasterEntry(RedisClient client) {
CompletableFuture<InetSocketAddress> addrFuture = client.resolveAddr();
return addrFuture.thenCompose(res -> {
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
//省略...
CompletableFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
//省略...
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}).whenComplete((r, e) -> {
if (e != null) {
client.shutdownAsync();
}
}).thenApply(r -> client);
}
创建连接条目,并添加到连接池中。继续看添加到连接池操作:
public CompletableFuture<Void> add(ClientConnectionsEntry entry) {
CompletableFuture<Void> promise = initConnections(entry, true);
return promise.thenAccept(r -> {
entries.add(entry);
});
}
这里做了初始化连接操作,然后添加到连接池的队列中,接着看一下initConnections初始化连接:
private CompletableFuture<Void> initConnections(ClientConnectionsEntry entry, boolean checkFreezed) {
//省略...
int startAmount = Math.min(5, minimumIdleSize);
AtomicInteger requests = new AtomicInteger(startAmount);
for (int i = 0; i < startAmount; i++) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
}
return initPromise;
}
省略掉中间一些调用链,最终会调用到RedisClient的connectAsync方法:
public RFuture<RedisConnection> connectAsync() {
CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
CompletableFuture<RedisConnection> r = new CompletableFuture<>();
ChannelFuture channelFuture = bootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {
//省略...
});
return r;
});
return new CompletableFutureWrapper<>(f);
}
此处就用使用前面创建的Bootstrap进行连接操作,当然这里是初始化连接到连接池,如果并发比较大,连接池中初始连接数不够用,会在发起请求的时候创建新的连接。
2.加锁
加锁会先调用RedissonClient�创建锁对象。
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name);
}
然后创建RedissonLock:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
调用父类构造函数,指定执行器、锁释放时间以及发布订阅组件。
然后继续看加锁逻辑,这里加锁我们使用tryLock并指定了等待时间、释放时间:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1.尝试获取锁,如果ttl返回null,代表加锁成功
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
// 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
/**
* 2.订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
* 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争.
*
* 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
* 当 this.await 返回 true,进入循环尝试获取锁.
*/
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
// 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
/**
* 3.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
* 获取锁成功,则立马返回 true,
* 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
*/
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
//无论是否获得锁,都要取消订阅解锁消息
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
}
上述代码的核心逻辑是:
- 尝试获取锁,如果获取成功则返回调用
- 如果超过了等待时间,则返回获取失败
- 订阅锁释放事件,并通过await方法阻塞等待锁释放,基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争获取锁
- 收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁,获取锁成功,则返回true,若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回false结束循环
- 最后无论是否获得锁,都要取消订阅解锁消息,不再参与锁获取和竞争
尝试获取锁会调用tryAcquire方法,看一下实现:
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
继续调用tryAcquireAsync方法,并同步获取其返回:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
如果传入锁释放时间且大于零,使用用户传入的释放时间,否则使用默认的释放时间30秒,然后调用tryLockInnerAsync获取锁并返回中心化节点数据的ttl时间。 如果用户传入了leaseTime就不会开启看门狗机制实现自动续期,如果没有传入则开启看门口续期机制。 继续看tryLockInnerAsync方法实现:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
为了保证操作的原子性,这里使用了lua脚本来操作redis,执行脚本时key是加锁的名称,ARGV分别是释放时间和线程信息。从脚本内容可以看出,锁在redis中的数据结构是hash,外层key存储的是锁的名称,内部field和value存储的是加锁客户端线程信息。脚本含义是:
- 如果hash不存在,则直接放入加锁客户端信息并设置失效时间返回
- 如果hash中存在加锁客户端的信息,则value加1实现重入逻辑,并设置过期时间返回
- 否则竞争加锁失败,返回锁对应hash的过期时间
然后调用evalWriteAsync�执行lua脚本:
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
int availableSlaves = entry.getAvailableSlaves();
CommandBatchService executorService = createCommandBatchService(availableSlaves);
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (commandExecutor instanceof CommandBatchService) {
return result;
}
RFuture<BatchResult<?>> future = executorService.executeAsync();
//省略...
return new CompletableFutureWrapper<>(f);
}
调用evalWriteAsync执行命令,调用executeAsync等待批处理任务完成,并获取任务的结果。继续看evalWriteAsync实现:
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType,
String script, List<Object> keys, boolean noRetry, Object... params) {
if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
//省略...
RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd,
args.toArray(), promise, false,
connectionManager, objectBuilder, referenceType, noRetry);
executor.execute();
//省略...
return new CompletableFutureWrapper<>(mainPromise);
}
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
return async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), false, noRetry);
}
是否开启了脚本缓存走不同的逻辑,但是都是构造RedisExecutor�并执行execute�:
public void execute() {
//省略...
CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture();
//省略...
connectionFuture.whenComplete((connection, e) -> {
//省略...
sendCommand(attemptPromise, connection);
//省略...
});
attemptPromise.whenComplete((r, e) -> {
releaseConnection(attemptPromise, connectionFuture);
checkAttemptPromise(attemptPromise, connectionFuture);
});
}
最终会走到RedisConnection的send方法:
public <T, R> ChannelFuture send(CommandData<T, R> data) {
return channel.writeAndFlush(data);
}
这里就是调用netty客户端使用channel发送请求到redis服务端了。执行完成,调用方就根据tryLock返回结果感知到是否加锁成功,然后执行下一步动作。
整个加锁执行链路时序如下:
当然redisson分布式锁还有其他实现,RedissonMultiLock和RedissonSpinLock:
- RedissonMultiLock:是一种同时获取多个锁的分布式锁。可以将多个RedissonLock对象传递给RedissonMultiLock构造函数,然后通过调用lock()方法一次性获取所有的锁释放锁时,需要调用unlock()方法对每个锁进行解锁
- RedissonSpinLock:是一种自旋锁,采用的是非阻塞式锁的方式。当一个线程获取到该锁后,如果其他线程尝试获取同一个锁,它们将反复自旋(忙等待)直到持有该锁的线程释放锁。自旋锁适用于锁竞争的情况下,短暂的锁竞争下,自旋锁的性能优于阻塞式锁
对于这两种锁实现不再展开分析,逻辑和原理主链路基本相似。
3.看门狗续期
获取锁的流程中,tryAcquireAsync�中有一段代码:
CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
// lock acquired
if (acquired) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return acquired;
});
ttlRemaining不为空代表加锁成功,如果用户指定了锁释放时间就返回调用,否则就开启续期能力,也就是看门狗机制。 题外话,在面试中我经常问别人,redisson分布式锁如何解决续期问题的,有相当一部分人上来就说通过看门狗机制,我反问所有场景和用法都会开启看门狗机制吗?有人很武断的说会,也有人蚌埠住了。 言归正传,通过scheduleExpirationRenewal来具体看一下看门狗机制是什么样子的。
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
创建ExpirationEntry对象,存放线程续期信息,如果已经存在则已经存在与当前对象相同名称的续约信息,将当前线程ID加入到oldEntry中,表示需要更新该续约信息;否则调用renewExpiration方法操作续期,如何线程被中断则取消续期。主要看一下renewExpiration实现:
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
//省略...
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
该方法做的事情就是每internalLockLeaseTime的1/3时间执行续期动作,internalLockLeaseTime默认是30秒,可以修改,并且延迟操作是通过netty的时间轮实现,每一次续期操作都会触发下一次延迟。 接着看一下renewExpirationAsync�的实现:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
同样是通过lua脚本操作redis,检查加锁的客户端线程是否存在,如果存在则通过pexpire命令重新设置过期时间,从而达到续期作用,并返回1(代表续期成功),否则返回0(续期失败)。
看门狗续期操作及流程大致如下:
4.释放锁
释放锁会调用RedissonLock的unlock方法操作,看一下unlock:
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
然后调用unlockAsync方法:
public RFuture<Void> unlockAsync(long threadId) {
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);
//省略...
return null;
});
return new CompletableFutureWrapper<>(f);
}
根据当前线程id释放锁,并且取消看门狗续期能力,主要看unlockInnerAsync方法释放锁。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
也是通过lua脚本来操作redis实现释放锁,上述脚本主要做了以下操作:
- 如果当前线程持有锁资源,那么减少hash中field的value值
- 如果当前线程持有的hash中field的value值大于0,那么重新设置过期时间,从而支持重入能力
- 如果当前线程持有的hash中field的value值大于0,那么需要释放锁,通过publish命令发布释放事件通知,告诉其他竞争者去抢占锁资源
这样就释放了锁资源,并且会通知其他订阅了事件的加锁参与者去尝试加锁。
三、分布式锁考虑的问题
1.续期问题
锁续期是分布式锁一定要考虑的问题,锁时间过短会导致锁释放了业务还在执行,但是锁又被其他客户端获取,从而导致数据不一致问题;锁时间过长又会导致其他客户端长时间等待,造成性能和体验问题。续期主要考虑以下两点:
- 自动续期:所持有过程中,会处理比较复杂的业务,需要一种机制在业务可能在释放之前处理不完的情况下,让业务无感知实现自动续期,而不影响业务的执行。
- 最大续期次数:互联网业务相对比较复杂多变,在服务依赖的资源或者服务出现短暂抖动或者不可用的情况下,可能短时间的续期解决不了问题,而无限制的续期又会影响的整个服务的性能或者拖垮服务,需要设置相对合理的策略,来限制最大续期次数和时间,从而来保证服务更高性能的表现。
2.可用性
可用性更多的依赖中心化资源的稳定性,redisson分布式锁是基于redis实现的,那么如果redis是单机模式,redisson做再大的努力也是徒劳。对于主从模式,redisson加锁肯定是操作的主节点,主从同步默认是异步的,在主节点加锁成功后,突然宕机,加锁数据尚未同步到从节点,此时从节点晋升为主节点,那么新的主节点不具有redisson加锁数据,新的请求来了之后会重新加锁,从而会出现问题。 对于集群模式下使用Redisson进行分布式锁时,至少要有半数以上的Redis节点在获取锁时才会视为成功,这个机制可以保证在网络分区或部分Redis节点故障的情况下,分布式锁仍然能够正常工作,避免因为单点故障导致整个系统的不可用性。
3.可重入性
Redisson分布式锁是支持可重入的,也就是说同一个线程可以多次获取同一个锁而不会造成死锁。当一个线程已经获取了一个分布式锁,并且没有释放锁之前,它可以再次请求获取相同名称的锁。在这种情况下,Redisson会维护一个计数器来记录锁的重入次数。每次成功获取锁时,计数器会加一;在释放锁时,计数器会相应地减一。只有当线程释放锁的次数与获取锁的次数相匹配(计数器为0),锁才会完全释放,其他线程才能获得该锁。这样可以保证同一个线程在持有锁的情况下,可以多次获取锁而不会被阻塞或产生死锁。 可重入性是Redisson分布式锁的一个重要特性,它使得在复杂的业务逻辑中能够灵活地使用锁,避免了线程自身因为重入而产生的问题。需要注意的是,重入次数计数器是基于线程级别的,不同线程之间的计数器是独立的,因此不能用于跨线程的重入。
4.死锁检测与恢复
Redisson分布式锁提供了死锁检测与恢复的机制,以帮助应对潜在的死锁情况。
首先,Redisson会为每个获取到的分布式锁设置一个过期时间(expire)。这个过期时间是最大持有锁的时间,确保即使持有锁的线程发生异常或没有正确释放锁,锁也能在一段时间后自动释放,避免长时间的死锁。
其次,Redisson引入异步续期(async renewal)机制。在获取锁成功后,Redisson会使用后台线程定期自动续期(renewal)锁的过期时间,以防止持有锁的线程因为某些原因没有及时续期导致锁的过期。这样可以减少因为网络延迟、GC暂停等问题而造成的误解锁。 此外,Redisson还提供了针对死锁的自动解锁(auto-unlock)功能。当一个线程持有锁的时间超过指定的阈值后,Redisson会自动解锁该锁,并触发一个解锁事件。通过监听解锁事件,可以实现对死锁的检测和恢复操作,例如记录日志、重试获取锁等。 需要注意的是,无法完全消除死锁的发生,因为死锁是由于复杂的并发环境和业务逻辑导致的。但是通过上述的机制,Redisson能够在大部分情况下检测到死锁,并提供自动解锁的功能,以减少死锁对系统的影响。要充分利用Redisson的死锁检测与恢复机制,建议合理设置过期时间、异步续期和自动解锁的阈值,并结合监控和日志来及时发现和解决潜在的死锁问题。
四、参考
github.com/redisson/re… zhuanlan.zhihu.com/p/135864820 zhuanlan.zhihu.com/p/230433777 www.cnblogs.com/Leo_wl/p/16…
转载自:https://juejin.cn/post/7259355632930684989