算法技巧-时间轮(二)
本文紧接着算法技巧-时间轮(一)
经典实现
Netty中时间轮实现
dubbo中时间轮实现参考的就是netty中的,基本实现都差不多。这里不一一详述了。
时间轮 | 时间轮的格子 | 格子里的任务 | 时间轮运转线程 |
---|---|---|---|
HashedWheelTimer | HashedWheelBucket | HashedWheelTimeout | Worker |
应用场景
延迟消息发送
:当需要在一定时间后发送消息时,可以使用时间轮来管理这些消息的定时触发时间。心跳检测
:当需要定时向客户端发送心跳包时,可以使用时间轮来管理心跳包的发送时间。连接超时检测
:当需要检测客户端连接是否超时时,可以使用时间轮来管理连接的超时时间。断线重连
:当客户端与服务器的连接断开时,可以使用时间轮来管理重连的时间间隔。限流控制
:当需要限制某些操作的频率时,可以使用时间轮来管理操作的触发时间,从而实现限流控制。
KAFKA中时间轮实现
Kafka内部有很多延时性的操作,如延时生产
,延时拉取
,延时数据删除
等,这些延时功能由内部的延时操作管理器来做专门的处理,其底层是采用时间轮实现的。
基本概念
Kafka 中的时间轮( TimingWheel )是一个存储定时任务的环形队列 , 底层采用数组实现,数组中的每个元素可以存放一个定时任务列表( TimerTaskList )。 TimerTaskList是一个环形的双向链表,链表中的每一项就是定时任务项( TimerTaskEntry ),其中封装了真正的定时任务( TimerTask )。
- tickMs:单位时间跨度
- wheelSize:时间轮槽长度
- interval:总体时间跨度 tickMs * wheelSize
若整个时间轮的总体时间跨度interval = tickMs * wheelSize
,比如20ms,那么对于定时为350ms 的任务该如何处理?此时已经超出了时间轮能表示的时间跨度。
除了前文DUBBO中的剩余执行轮数
remainingRounds
优化,还可以使用层级时间轮
当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。比如对于20ms跨度的时间轮,它的上级是
interval = 20 * 20 = 400ms
,对于400ms跨度的时间轮,它的上级是interval = 400 * 20 = 8000ms
,以此类推:
举个例子,对于450ms 的定时任务:
- 首先,会升级存放到第三层时间轮中,被插入到第三层时间轮的时间格1所对应的 TimerTaskList;
- 随着时间的流逝,当此 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作;
- 于是执行时间轮降级,将剩余时间为 50ms 的定时任务重新提交到第二层到期时间为 [40ms,60ms)的时间格中;
- 再经历 40ms 之后,此时这个任务又被 “察觉 ”,不过还剩余 10ms ,所以还要降级一次,放到第一层时间轮的[ 10ms, 11ms)的时间格中;
- 最后,经历 l0ms 后,此任务真正到期,最终执行相应的到期操作。
还存在一个问题,实际场景中,并不是每个槽都有定时任务,这个如果按照DUBBO时间轮执行方式,每个槽都需要执行一下,然后再sleep,具体代码可参考
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (; ; ) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
那么在KAFKA中怎么做的呢,实际是使用了JDK 中的 DelayQueue 来推进时间轮。具体做法是将每个使用到的TimerTaskList都加入到一个DelayQueue中,DelayQueue 会根据 TimerTaskList的超时时间来排序,最短超时时间的TimerTaskList会被排在 DelayQueue 的队头。 具体做法:
-
对于每个使用到的TimerTaskList 调用delayQueue.offer加入DelayQueue,超时时间为TimerTaskList对应的expired;
-
DelayQueue会根据TimerTaskList 对应的超时时间expiration来排序, 最短expiration 的TimerTaskList会被排在DelayQueue的队头。
-
Kafka 中会有一个线程通过调用delayQueue.take来获取DelayQueue中到期的任务列表,这个线程叫作“ExpiredOperationReaper”,可以直译为“过期操作收割机”。
-
对获取到的任务列表,执行具体的任务
PS:KAFKA时间轮涉及来源于网上
去哪儿QMQ实现
讲到QMQ之前,我们先讲一下ROCKETMQ
ROCKETMQ
RocketMQ 开源版本支持延时消息,但是只支持 18 个 Level 的延时,并不支持任意时间。只不过这个 Level 在 RocketMQ 中可以自定义的,所幸来说对普通业务算是够用的。默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。
通俗的讲,设定了延时 Level 的消息会被暂存在名为SCHEDULE_TOPIC_XXXX
的topic中,并根据 level 存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延时的消息,保证具有相同发送延时的消息能够顺序消费。 broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
优点:
- Level 数固定,每个 Level 有自己的定时器,开销不大
- 将 Level 相同的消息放入到同一个 Queue 中,保证了同一 Level 消息的顺序性;不同 Level 放到不同的 Queue 中,保证了投递的时间准确性;
- 通过只支持固定的Level,将不同延时消息的排序变成了固定Level Topic 的追加写操作
缺点:
- Level 配置的修改代价太大,固定 Level 不灵活
- CommitLog 会因为延时消息的存在变得很大
固定延时Level实现简单,但是实际使用时不太实用。ROCKETMQ商业版支持任意时间消费延时消息,不过需要使用阿里云中间件,付费。
QMQ概述
QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。官网 QMQ 的基本构成组件如下:
- Meta Server:提供集群管理和集群发现的作用
- Server:提供实时消息服务
- Delay Server:提供延时 / 定时消息服务,延时消息先在 delay server 排队,时间到之后再发送给 server
- Producer:消息生产者
- Consumer:消息消费者
QMQ提供任意时间的延时/定时消息,你可以指定消息在未来两年内(可配置)任意时间内投递。里面设计的核心简单来说就是 多级时间轮 + 延时加载 + 延时消息单独磁盘存储 。
QMQ的延时/定时消息使用的是两层 hash wheel 来实现的。
- message log:和实时消息里的 message log 类似,收到消息后 append 到该 log 就返回给 producer,相当于 WAL。
- schedule log:按照投递时间组织,每个小时一个。该 log 是回放 message log 后根据延时时间放置对应的 log 上,这是上面描述的两层 hash wheel 的 第一层,位于磁盘上。schedule log 里是包含完整的消息内容的,因为消息内容从 message log 同步到了 schedule log,所以历史 message log 都 可以删除 (所以 message log 只需要占用极小的存储空间,所以我们可以使用低容量高性能的 ssd 来获取极高的吞吐量)。另外,schedule log 是按照延时 时间组织的,所以延时时间已过的 schedule log 文件也可以删除。
- 第一层位于磁盘上,每个小时为一个刻度(默认为一个小时一个刻度,可以根据实际情况在配置里进行调整),每个刻度会生成一个日志文件(schedule log),因为QMQ支持两年内的延时消息(默认支持两年内,可以进行配置修改),则最多会生成 2 * 366 * 24 = 17568 个文件(如果需要支持的最大延时时间更短,则生成的文件更少)。
- 第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引(索引包括消息在schedule log中的offset和size)从磁盘文件加载到内存中的hash wheel上,内存中的hash wheel则是以500ms为一个刻度 。
- dispatch log:延时 / 定时消息投递成功后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log 里写入的是消息的 offset,不包含消息 内容。当延时 server 中途重启时,我们需要判断出当前这个刻度 (比如一个小时) 里的消息有哪些已经投递了则不重复投递。
QMQ核心实现
主要实现如下:
大体流程图:
- Delay Server 中包含的几个周期定时任务
- messageLogFlushService:负责 delay server 接受消息后,将 messagelog 刷盘
- dispatchLogFlushService:delay message 到期发送后,写 offset 到 dispatchlog,其主要负责将 dispatchlog 刷盘
- iterateOffsetFlushService:主要负责回放 messagelog,并管理回放进度,进度保存在 message_log_iterate_checkpoint.json
- WheelTickManager 主要工作
- start timer: 初始化并启动时间轮
- recover:根据 dispatchlog 和 回放进度恢复时间轮数据
- load schedulelog:周期加载 schedulelog 数据来填充时间轮数据
- 监听 messagelog 的回放事件,回放添加 schedulelog 的时候判断 (改延时消息是否属于当前延迟刻度,eg. 1h 内) 是否需要将其添加到时间轮中
- HashedWheelTimer 时间轮实现 时间轮 和 Dubbo 大体是一致的,都是先将任务添加到 Queue timeouts 中,然后周期从这个列表中获取 100000 个来添加到 HashedWheel 中对应的 HashedWheelBucket 中。
总结
时间轮是一种高效的延时队列,可以参考上述业界实现来应用到实际场景中。
参考 1.RocketMQ 消息集成:多类型业务消息——定时消息 2.19 TimingWheel:探究Kafka定时器背后的高效时间轮算法 3.透彻理解Kafka(十)——时间轮调度 4.延时消息常见实现方案 5.深入 RocketMQ- 消息原理篇 6.延时任务一锅端
转载自:https://juejin.cn/post/7258670269924081723