likes
comments
collection
share

【redis】一文看懂redission看门狗机制源码实现

作者站长头像
站长
· 阅读数 3

Hi,今天白梦又来卷了,趁着有时间,看了Redission锁原理以及看门狗的部分源码,主要是RedissonLockRedissonBaseLock这两个类。

看新不看旧,首先引入Redission最新版本。

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.26.1</version>
</dependency>

Redission锁原理

Redission锁原理通过lua脚本+哈希来完成,而获取锁的方法主要是在tryAcquireAsync方法上。 主要分为两步: 1、调用tryLockInnerAsync方法,执行的是一个lua脚本,如果获取锁失败,返回的结果是这个key的剩余有效期,如果获取锁成功,则返回null。 2、如果获取锁成功ttlRemaining == null,并且leaseTime是默认值-1时,则执行this.scheduleExpirationRenewal(threadId);来启动看门狗机制。

private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    //如果没有设置leaseTime,则传默认值internalLockLeaseTime(30 * 1000),有则设置leaseTime
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);

    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        //锁获取
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                //开启看门狗机制
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

@Override
public boolean tryLock() {
    return get(tryLockAsync());
}

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, command,
            "if ((redis.call('exists', KEYS[1]) == 0) " + //判断是否有线程加锁
                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + //判断是否是当前线程加锁
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //重入次数+1
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " + //设置过期时间
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);", //返回锁的还有多久过期
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

看门狗机制

scheduleExpirationRenewal()方法十分简单:

  1. 一个锁就对应自己的一个ExpirationEntry类,EXPIRATION_RENEWAL_MAP存放所有的锁信息。
  2. EXPIRATION_RENEWAL_MAP里面获取锁,如果存在,则锁重入,如果不存在,则将新锁放入EXPIRATION_RENEWAL_MAP,并开启看门狗机制。
protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        //重入锁
        oldEntry.addThreadId(threadId);
    } else {
       //加入锁
        entry.addThreadId(threadId);
        try {
            //看门狗续约
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

renewExpiration 方法是看门狗主要实现,主要逻辑如下:

  1. 首先从 EXPIRATION_RENEWAL_MAP 中获取与当前锁关联的 ExpirationEntry 对象。
  2. 如果未找到该对象,则直接返回。
  3. 使用 getServiceManager().newTimeout 创建了一个定时任务,在定时任务中执行如下逻辑:
    • 首先再次检查 EXPIRATION_RENEWAL_MAP,确保锁仍然存在。
    • 获取锁的第一个线程的 ID。
    • 调用 renewExpirationAsync 方法异步更新锁的过期时间。
    • 根据异步更新的结果,如果成功,则再次调用 renewExpiration 方法,即重新设置下一次的过期时间更新任务;如果失败,则取消过期时间的更新任务。

renewExpirationAsync 方法中:

  1. 使用 Redis 的 EVAL 命令执行 Lua 脚本,检查锁是否仍然存在,如果存在,则更新锁的过期时间为 internalLockLeaseTime,返回1表示成功;如果不存在,则返回0表示失败。
  2. 返回一个 CompletionStage,用于异步处理更新过期时间的结果。

这样,通过定时任务定期检查锁的过期时间,并异步更新过期时间,实现了 Redis 锁的自动续期功能,即当锁的持有者处理时间较长时,能够保证锁不会过期而被释放。

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //创建一个定时任务
    Timeout task = getServiceManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());、
            //检查锁
            if (ent == null) {
                return;
            }
            //获取锁的第一个线程的 ID
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //异步更新锁的过期时间
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    //重新设置下一次的过期时间更新任务
                    renewExpiration();
                } else {
                   //取消过期时间的更新任务
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}


protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

锁释放

unlockAsync0是锁释放的主要实现,步骤如下:

  1. 首先调用 unlockInnerAsync 方法异步释放锁:
    • getServiceManager().generateId():生成一个用于标识本次释放操作的唯一标识符 id
    • 获取 Redisson 配置信息 MasterSlaveServersConfig
    • 计算释放锁的超时时间 timeout,该超时时间考虑了 Redisson 配置中的超时时间、重试间隔以及重试次数,确保在释放锁的过程中不会因为超时而导致操作失败。
    • 调用 unlockInnerAsync(threadId, id, timeout) 方法,异步释放锁,并返回一个 RFuture<Boolean> 对象,表示释放锁的异步操作。RedissonLock重写了RedissonBaseLock的unlockInnerAsync,代码如下图,通过lua脚本释放锁。 【redis】一文看懂redission看门狗机制源码实现
    • 使用 thenApply 方法对释放锁的结果进行处理,其中包括删除释放锁时所用的标识符 id。这一步是为了确保释放锁操作的完整性和幂等性。
    • 将处理后的结果包装成一个 CompletableFutureWrapper,返回给调用者。
  2. 使用 CompletionStagehandle 方法处理异步释放锁的结果:
    • 取消对应锁的过期时间续期任务。
    • 如果释放锁过程中出现了异常,将异常抛出。
    • 如果释放锁的操作状态为 null,抛出 IllegalMonitorStateException 异常,表示尝试释放一个未被当前线程持有的锁。
    • 如果一切正常,则返回 null。
private RFuture<Void> unlockAsync0(long threadId) {
    CompletionStage<Boolean> future = unlockInnerAsync(threadId);
    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        //取消对应锁的过期时间续期任务
        cancelExpirationRenewal(threadId);

        if (e != null) {
            if (e instanceof CompletionException) {
                throw (CompletionException) e;
            }
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }

        return null;
    });

    return new CompletableFutureWrapper<>(f);
}


protected final RFuture<Boolean> unlockInnerAsync(long threadId) {
    String id = getServiceManager().generateId();
    MasterSlaveServersConfig config = getServiceManager().getConfig();
    //计算释放锁的超时时间 ,通过Redisson 配置中的超时时间、重试间隔以及重试次数计算
    int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts();
    timeout = Math.max(timeout, 1);
    RFuture<Boolean> r = unlockInnerAsync(threadId, id, timeout);
    CompletionStage<Boolean> ff = r.thenApply(v -> {
        CommandAsyncExecutor ce = commandExecutor;
        if (ce instanceof CommandBatchService) {
            ce = new CommandBatchService(commandExecutor);
        }
        //异步删除释放锁时所用的标识符 `id`
        ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));
        if (ce instanceof CommandBatchService) {
            ((CommandBatchService) ce).executeAsync();
        }
        return v;
    });
    return new CompletableFutureWrapper<>(ff);
}

cancelExpirationRenewal 方法中:

  1. EXPIRATION_RENEWAL_MAP 中获取与锁关联的 ExpirationEntry 对象。
  2. 如果未找到该对象,直接返回。
  3. 如果传入了 threadId,则移除该线程 ID。
  4. 如果 threadId 为 null,或者锁不再被任何线程持有,则取消定时任务,并从 EXPIRATION_RENEWAL_MAP 中移除该锁的相关信息。
protected void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    
    if (threadId != null) {
        task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

总的来说,当释放锁时,首先取消锁的过期时间续期任务,然后异步释放锁,并处理释放锁的结果,保证了释放锁的操作安全可靠。

结束

今天就介绍了Redission锁原理、锁释放以及看门狗机制的部分源码。不说了,一键三连!!!