springboot&redisson实现延时队列
Redisson实现延时队列
版本说明:
- spring boot 2.6.0
- redisson-spring-boot-starter 3.28.0
一、加入依赖&配置
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.28.0</version>
</dependency>
application.properties
spring.application.name=springboot-redis-delayed-queue-demo
spring.redis.database=2
spring.redis.host=localhost
spring.redis.password=123456
spring.redis.port=6379
二、延时任务添加
package cn.aohan.delayedqueue.provider;
import cn.aohan.delayedqueue.model.DelayedTaskInfo;
import cn.aohan.delayedqueue.model.TaskData;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
/**
* @author 傲寒
* @date 2024/4/19
*/
@Component
public class DelayedQueueProvider {
private final RedissonClient redissonClient;
public DelayedQueueProvider(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* 添加延迟任务
*
* @param delayedName 延迟名称
* @param val 值
* @param delayTime 延迟时间
* @param timeUnit 时间单位
*/
public void addDelayedTask(String delayedName, TaskData val, long delayTime, TimeUnit timeUnit) {
final DelayedTaskInfo task = new DelayedTaskInfo();
task.setCreateAt(System.currentTimeMillis());
task.setDelayTime(delayTime);
task.setTimeUnit(timeUnit);
task.setVal(val);
task.setDelayedName(delayedName);
final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(delayedName);
delayedQueue.offer(task, delayTime, timeUnit);
}
/**
* 删除任务
*
* @param queueName 队列名称
* @param taskId 任务id
*/
public void removeTask(String queueName, String taskId) {
final RBlockingDeque<DelayedTaskInfo> blockingDeque = getBlockingDeque(queueName);
final Predicate<DelayedTaskInfo> predicate = item -> {
final TaskData val = item.getVal();
return Objects.nonNull(val) && Objects.equals(taskId, val.taskId);
};
blockingDeque.removeIf(predicate);
final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(getBlockingDeque(queueName));
delayedQueue.removeIf(predicate);
}
/**
* 获取阻塞deque
*
* @param queueName 队列名称
* @return {@link RBlockingDeque}<{@link DelayedTaskInfo}>
*/
public RBlockingDeque<DelayedTaskInfo> getBlockingDeque(String queueName) {
return redissonClient.getBlockingDeque(queueName, JsonJacksonCodec.INSTANCE);
}
/**
* 获取延迟队列
*
* @param queueName 队列名称
* @return {@link RDelayedQueue}<{@link DelayedTaskInfo}>
*/
private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(String queueName) {
return redissonClient.getDelayedQueue(getBlockingDeque(queueName));
}
/**
* 获取延迟队列
*
* @param blockingDeque 阻塞deque
* @return {@link RDelayedQueue}<{@link DelayedTaskInfo}>
*/
private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(RBlockingDeque<DelayedTaskInfo> blockingDeque) {
return redissonClient.getDelayedQueue(blockingDeque);
}
}
三、监听延时任务到期
延时队列名称常量
/**
* @author 傲寒
* @date 2024/4/19
*/
public class QueueConstant {
/**
* 测试延迟任务队列 name
*/
public static final String TEST_DELAYED_TASK_QUEUE = "test_delayed_task_queue";
}
listener监听
package cn.aohan.delayedqueue.listener;
import cn.aohan.delayedqueue.constant.QueueConstant;
import cn.aohan.delayedqueue.model.DelayedTaskInfo;
import cn.aohan.delayedqueue.provider.DelayedQueueProvider;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 延迟任务监听器
*
* @author 傲寒
* @date 2024/4/19
*/
@RequiredArgsConstructor
@Slf4j
@Component
public class DelayedTaskListener implements ApplicationRunner {
private final DelayedQueueProvider delayedQueueProvider;
@Override
public void run(ApplicationArguments args) throws Exception {
delayedTaskHandle(QueueConstant.TEST_DELAYED_TASK_QUEUE);
}
public void delayedTaskHandle(String delayedQueueName) {
final Thread thread = new Thread(() -> {
final RBlockingDeque<DelayedTaskInfo> blockingDeque = delayedQueueProvider.getBlockingDeque(delayedQueueName);
while (true) {
try {
//将到期的数据取出来,等待超时
final DelayedTaskInfo delayedTaskInfo = blockingDeque.poll(2, TimeUnit.MINUTES);
if (Objects.isNull(delayedTaskInfo)) {
continue;
}
log.info("DelayedTask task :[{}]", delayedTaskInfo);
} catch (Exception e) {
log.error("DelayedTaskListener#delayedTaskHandle error delayedQueueName:[{}]", delayedQueueName, e);
}
}
});
thread.setDaemon(true);
thread.start();
}
}
四、测试延时任务添加
package cn.aohan.delayedqueue.controller;
import cn.aohan.common.dto.Result;
import cn.aohan.delayedqueue.constant.QueueConstant;
import cn.aohan.delayedqueue.model.dto.TestDelayedDTO;
import cn.aohan.delayedqueue.provider.DelayedQueueProvider;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 傲寒
* @date 2024/4/19
*/
@AllArgsConstructor
@RestController
@RequestMapping("/api/test/delayed")
public class DelayedQueueTestController {
private final DelayedQueueProvider delayedQueueProvider;
/**
* 添加延迟任务
*
* @param delayedTask 延迟任务
* @return {@link Result}<{@link Void}>
*/
@PostMapping
public Result<Void> addDelayedTask(@RequestBody TestDelayedDTO delayedTask) {
delayedQueueProvider.addDelayedTask(
QueueConstant.TEST_DELAYED_TASK_QUEUE,
delayedTask.getVal(),
delayedTask.getDelayTime(),
delayedTask.getTimeUnit()
);
return Result.success();
}
}
五、大致流程及其原理
在一开始创建延时队列的时候会创建一个QueueTransferTask
org.redisson.RedissonDelayedQueue#RedissonDelayedQueue
channelName = prefixName("redisson_delay_queue_channel", getRawName());
- 这里并订阅(channel)延时队列创建任务调度(主要是使用netty中时间轮)。
- 使用
pushTaskAsync
去操作lua脚本移除redis 中LIST和ZSET的元素。
根据延迟时间插入到对中合适的位置,主要是
org.redisson.RedissonDelayedQueue#offerAsync
方法中的一段lua脚本
local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);
redis.call('zadd', KEYS[2], ARGV[1], value);
redis.call('rpush', KEYS[3], value);
local v = redis.call('zrange', KEYS[2], 0, 0);
if v[1] == value then
redis.call('publish', KEYS[4], ARGV[1]);
end;
总而言之,这段代码的功能是:
- 将两个字符串打包成一个二进制值。
- 将打包后的值添加到一个排序集合(zset)中,并为其指定分数(当前时间+延迟时间)。
- 将同样的值添加到一个列表的尾部。
- 如果添加的元素是排序集合中的第一个元素,则向发布一条消息(上边的订阅的channel)。
然后使用BLPOP阻塞的去获取LIST的元素
redisson实现延迟队列的原理,简单来说,将数据插入到延迟队列时,会存入到延迟队列的list和zset结构中,通过任务调度的方式将延迟队列中到期的数据取出,然后放入到阻塞队列中,客户端通过BLPOP的命令阻塞的拉取阻塞队列的数据,若拉取到数据就可以进行业务逻辑的处理。
转载自:https://juejin.cn/post/7360520652518146086