Java 面试宝典:怎么使用 Redis 实现一个延时队列?
大家好,我是大明哥,一个专注「死磕 Java」系列创作的硬核程序员。
本文已收录到我的技术网站:www.skjava.com。有全网最优质的系列文章、Java 全栈技术文档以及大厂完整面经
回答
Redis 本身是不支持延时队列的,但是我们可以利用 Redis 一些特定的数据结构和特性来实现延时队列。
基于 Redis 目前有三种方式可以实现延时队列:
-
利用 Redis 过期消息实现延时队列
- Redis 允许我们为每一个 key 设置过期时间,在 key 过期时,Redis 可以配置为发送一个过期事件。在应用程序通过监听这个过期事件,就可以实现延迟队列了。
-
使用 Sorted Set 实现延时队列
-
Redisson 实现延迟队列
详解
利用 Redis 过期消息实现延时队列
该方法是基于 Redis key 的过期通知特性。当一个 key 过期时,如果配置了Redis 的 key 过期通知功能,Redis 会发布一个消息到特定的 Channel,应用程序可以订阅这个 Channel 来接收过期事件,进而触发相应的业务处理逻辑。
实现步骤如下:
- 配置键空间通知
在 Redis 配置文件 redis.conf
中增加一条配置 notify-keyspace-events
。
- 应用程序订阅过期事件
应用程序使用 Redis 的发布/订阅功能订阅特定的过期事件通道:keyevent@0:expired
。当 key 过期时,应用程序会接收到过期 key 的名称,我们就可以依据这个来执行对应的业务逻辑。
首先新建 RedisKeyExpirationEventMessageListener
继承 KeyExpirationEventMessageListener
实现它的 onMessage()
:
@Slf4j
@Service
public class RedisKeyExpirationEventMessageListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
log.info("监听到过期key:{}",expiredKey);
}
}
然后配置 Redis 监听器:
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
我们设置几个配置过期时间的 key:
当这些 key 过期后,RedisKeyExpirationEventMessageListener
会收到相关通知:
至于 KeyExpirationEventMessageListener
的实现原理,其实是很简单的,直接看源码就行:
从源码中可以看出,它是监听 keyevent@*:expired
这个 channel,__keyevent@*__:expired
中的 *
代表监听所有的数据库。
这个方案是不是很简单?区区两步就实现了,但是 Redis 官方不推荐使用该方案,为什么?官方给出了原因:
key 的过期事件发布时机并不是当这个 key 的过期时间到了之后就发布,而是这个 key 在Redis中被清理之后,也就是真正被删除之后才会发布。但是 key 过期后并不是立刻删除的,它分为两种惰性删除和定期删除,所以这个有可能会存在延迟,不够精确,尤其是在高负载的情况下。
同时,事件也有可能会丢失,主要体现在若应用程序在接收事件通知之前就断开了连接,那么这些事件就会丢失,从而导致对应的任务无法执行。
同时,这个监听会导致所有的 key 过期后都会通知过来,如果我们要处理某一类型的 key 只能通过使用前缀来标识,非常麻烦。
使用 Sorted Set 实现延时队列
zset
(sorted Set)是 Redis 提供的一种数据结构,它能够根据 score 对元素进行自动排序。如果要将其用到延时队列的场景中,我们可以将任务的到期时间作为 score ,将任务标识作为 member,利用 zset
的排序功能来实现任务的延时执行。
在这个场景中我们需要实现一个定时任务,定期轮询从 zset 中获取元素,若 “当前时间 > score” 则可以执行任务,对于每个准备执行的任务,使用 ZREM
将其从 zset 中移除,然后执行任务的具体逻辑。
为了防止在多个实例/线程中任务被重复执行,可以在执行任务前再次检查并移除任务,或者使用 Lua 脚本来原子化这个操作(获取并删除操作)。
该方案的效率较高,Redis 提供的 zset
提供了高效的支持,同时 Redis 的持久化保证了任务的可靠性,避免了数据的丢失。
但是该方案实施不是那么容易,而且需要应用程序不断轮询 Redis 来检查是否有任务到期,这可能会导致一定的延迟,并且如果轮询间隔设置得太短,可能会对 Redis 服务器造成不必要的负载。同时,还需要考虑多实例任务任务执行,这就需要考虑幂等,分布式锁问题了。
Redisson 实现延迟队列
首先 zset 是一个不错的方案,但是我们自己实现起来稍微复杂了点,需要考虑的问题较多,但是如果有别人帮我们实现好了呢?Redisson 恰好提供了这个功能。
Redisson 提供的延迟队列(Delayed Queue)是它基于 Redis 的发布/订阅机制和 zset
实现的一种高级数据结构,用于处理需要延迟执行的任务。
其实现原理是:Redisson 将任务按照预定的执行时间存储在 zset
中,任务的执行时间作为 score,任务本身序列化后的数据作为 member。同时,Redisson 会在后台持续监控这个 ZSET,一旦发现有符合执行条件(当前时间 >= score)的任务,就会自动将这些任务转移到另一个 RQueue(Redisson 的队列实现)中,应用程序只需要从 RQueue 中取出任务执行即可。
下面大明哥将演示下如何利用 Redisson 来实现延迟队列。这里的关键是需要确保任务生产者和消费者共享同一个 Redisson 延迟队列实例。
- 一、任务生产者类 TaskProducer
负责添加任务到延迟队列
@Slf4j
public class TaskProducer implements Runnable{
private final RDelayedQueue<String> delayedQueue;
public TaskProducer(RDelayedQueue<String> delayedQueue) {
this.delayedQueue = delayedQueue;
}
@Override
public void run() {
try {
for (int i = 1 ; i <= 5 ; i++) {
String task = "sk-task-" + i;
delayedQueue.offer(task, i * 5, TimeUnit.SECONDS);
log.info("任务 {} 添加成功,将在 {} 秒执行",task,i * 5);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 创建任务消费者类
TaskConsumer
@Slf4j
public class TaskConsumer implements Runnable{
private final RBlockingQueue<String> blockingQueue;
public TaskConsumer(RBlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
// 阻塞等待并执行任务
while (true) {
String task = blockingQueue.take();
log.info("执行任务:{}",task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
- 测试类
public class DelayQueueExample {
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("delayQueue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
Thread producerThread = new Thread(new TaskProducer(delayedQueue));
Thread consumerThread = new Thread(new TaskConsumer(blockingQueue));
producerThread.start();
consumerThread.start();
}
}
运行结果:
这种方案利用了 Redis 的高性能和持久性,使延迟队列的实现既高效又可靠,同时 Redisson 提供的延迟队列抽象了底层的实现细节,使用简单方便。
扩展
Redis 键空间通知
Redis 键空间通知是 Redis 提供的一种机制,它允许客户端(应用程序)订阅相关事件,这些事件与 Redis 中的keys 的变化有关,当某个 key 发生了变化(如键过期、键被删除、值被修改等)时,应用程序会得到通知,从而可以执行相关的业务逻辑。
由于该操作会消耗一些性能,所以,默认情况下,键空间通知是禁用的,如果要使用该功能,我们需要在 Redis 的配置文件添加配置 notify-keyspace-events *
,或者通过 CONFIG SET
命令动态启用,如下:
CONFIG SET notify-keyspace-events KEA
KEA
是一个参数,代表我们想要订阅的事件类型:
K
:键空间通知,事件以__keyspace@<db>__:
为前缀。E
:键事件通知,事件以__keyevent@<db>__:
为前缀。A
:所有类型的通知。
除了上面这三个参数,还可以选择以下参数来订阅特定类型的事件:
g
- 通用命令(如 DEL,EXPIRE,RENAME 等)。$
- 字符串命令。l
- 列表命令。s
- 集合命令。h
- 哈希命令。z
- 有序集合命令。x
- 过期事件:当某个键过期并被删除时触发。e
- 驱逐事件:当某个键因为 maxmemory 策略而被删除时触发。
对于 Redis 面试题,大明哥一共总结了 60+ 篇,总字数 7万字,有兴趣的可【私大明哥】
转载自:https://juejin.cn/post/7371295699268829247