likes
comments
collection
share

算法技巧-时间轮(二)

作者站长头像
站长
· 阅读数 13

本文紧接着算法技巧-时间轮(一)

经典实现

Netty中时间轮实现

dubbo中时间轮实现参考的就是netty中的,基本实现都差不多。这里不一一详述了。

时间轮时间轮的格子格子里的任务时间轮运转线程
HashedWheelTimerHashedWheelBucketHashedWheelTimeoutWorker

应用场景

  1. 延迟消息发送:当需要在一定时间后发送消息时,可以使用时间轮来管理这些消息的定时触发时间。
  2. 心跳检测:当需要定时向客户端发送心跳包时,可以使用时间轮来管理心跳包的发送时间。
  3. 连接超时检测:当需要检测客户端连接是否超时时,可以使用时间轮来管理连接的超时时间。
  4. 断线重连:当客户端与服务器的连接断开时,可以使用时间轮来管理重连的时间间隔。
  5. 限流控制:当需要限制某些操作的频率时,可以使用时间轮来管理操作的触发时间,从而实现限流控制。

KAFKA中时间轮实现

Kafka内部有很多延时性的操作,如延时生产延时拉取延时数据删除等,这些延时功能由内部的延时操作管理器来做专门的处理,其底层是采用时间轮实现的。

基本概念

Kafka 中的时间轮( TimingWheel )是一个存储定时任务的环形队列 , 底层采用数组实现,数组中的每个元素可以存放一个定时任务列表( TimerTaskList )。 TimerTaskList是一个环形的双向链表,链表中的每一项就是定时任务项( TimerTaskEntry ),其中封装了真正的定时任务( TimerTask )。

算法技巧-时间轮(二)

  • tickMs:单位时间跨度
  • wheelSize:时间轮槽长度
  • interval:总体时间跨度 tickMs * wheelSize

若整个时间轮的总体时间跨度interval = tickMs * wheelSize,比如20ms,那么对于定时为350ms 的任务该如何处理?此时已经超出了时间轮能表示的时间跨度。

除了前文DUBBO中的剩余执行轮数remainingRounds优化,还可以使用层级时间轮

算法技巧-时间轮(二) 当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。比如对于20ms跨度的时间轮,它的上级是interval = 20 * 20 = 400ms,对于400ms跨度的时间轮,它的上级是interval = 400 * 20 = 8000ms,以此类推: 举个例子,对于450ms 的定时任务:

  1. 首先,会升级存放到第三层时间轮中,被插入到第三层时间轮的时间格1所对应的 TimerTaskList;
  2. 随着时间的流逝,当此 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作;
  3. 于是执行时间轮降级,将剩余时间为 50ms 的定时任务重新提交到第二层到期时间为 [40ms,60ms)的时间格中;
  4. 再经历 40ms 之后,此时这个任务又被 “察觉 ”,不过还剩余 10ms ,所以还要降级一次,放到第一层时间轮的[ 10ms, 11ms)的时间格中;
  5. 最后,经历 l0ms 后,此任务真正到期,最终执行相应的到期操作。

还存在一个问题,实际场景中,并不是每个槽都有定时任务,这个如果按照DUBBO时间轮执行方式,每个槽都需要执行一下,然后再sleep,具体代码可参考

private long waitForNextTick() {
        long deadline = tickDuration * (tick + 1);

        for (; ; ) {
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE;
                } else {
                    return currentTime;
                }
            }
            if (isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException ignored) {
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

那么在KAFKA中怎么做的呢,实际是使用了JDK 中的 DelayQueue 来推进时间轮。具体做法是将每个使用到的TimerTaskList都加入到一个DelayQueue中,DelayQueue 会根据 TimerTaskList的超时时间来排序,最短超时时间的TimerTaskList会被排在 DelayQueue 的队头。 具体做法:

  • 对于每个使用到的TimerTaskList 调用delayQueue.offer加入DelayQueue,超时时间为TimerTaskList对应的expired;

  • DelayQueue会根据TimerTaskList 对应的超时时间expiration来排序, 最短expiration 的TimerTaskList会被排在DelayQueue的队头。

  • Kafka 中会有一个线程通过调用delayQueue.take来获取DelayQueue中到期的任务列表,这个线程叫作“ExpiredOperationReaper”,可以直译为“过期操作收割机”。

  • 对获取到的任务列表,执行具体的任务

PS:KAFKA时间轮涉及来源于网上

去哪儿QMQ实现

讲到QMQ之前,我们先讲一下ROCKETMQ

ROCKETMQ

RocketMQ 开源版本支持延时消息,但是只支持 18 个 Level 的延时,并不支持任意时间。只不过这个 Level 在 RocketMQ 中可以自定义的,所幸来说对普通业务算是够用的。默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。

通俗的讲,设定了延时 Level 的消息会被暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据 level 存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延时的消息,保证具有相同发送延时的消息能够顺序消费。 broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

算法技巧-时间轮(二)

优点:

  • Level 数固定,每个 Level 有自己的定时器,开销不大
  • 将 Level 相同的消息放入到同一个 Queue 中,保证了同一 Level 消息的顺序性;不同 Level 放到不同的 Queue 中,保证了投递的时间准确性;
  • 通过只支持固定的Level,将不同延时消息的排序变成了固定Level Topic 的追加写操作

缺点:

  • Level 配置的修改代价太大,固定 Level 不灵活
  • CommitLog 会因为延时消息的存在变得很大

固定延时Level实现简单,但是实际使用时不太实用。ROCKETMQ商业版支持任意时间消费延时消息,不过需要使用阿里云中间件,付费。

QMQ概述

QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。官网 QMQ 的基本构成组件如下:

算法技巧-时间轮(二)

  • Meta Server:提供集群管理和集群发现的作用
  • Server:提供实时消息服务
  • Delay Server:提供延时 / 定时消息服务,延时消息先在 delay server 排队,时间到之后再发送给 server
  • Producer:消息生产者
  • Consumer:消息消费者

QMQ提供任意时间的延时/定时消息,你可以指定消息在未来两年内(可配置)任意时间内投递。里面设计的核心简单来说就是 多级时间轮 + 延时加载 + 延时消息单独磁盘存储

QMQ的延时/定时消息使用的是两层 hash wheel 来实现的。

算法技巧-时间轮(二)

  • message log:和实时消息里的 message log 类似,收到消息后 append 到该 log 就返回给 producer,相当于 WAL。
  • schedule log:按照投递时间组织,每个小时一个。该 log 是回放 message log 后根据延时时间放置对应的 log 上,这是上面描述的两层 hash wheel 的 第一层,位于磁盘上。schedule log 里是包含完整的消息内容的,因为消息内容从 message log 同步到了 schedule log,所以历史 message log 都 可以删除 (所以 message log 只需要占用极小的存储空间,所以我们可以使用低容量高性能的 ssd 来获取极高的吞吐量)。另外,schedule log 是按照延时 时间组织的,所以延时时间已过的 schedule log 文件也可以删除。
    • 第一层位于磁盘上,每个小时为一个刻度(默认为一个小时一个刻度,可以根据实际情况在配置里进行调整),每个刻度会生成一个日志文件(schedule log),因为QMQ支持两年内的延时消息(默认支持两年内,可以进行配置修改),则最多会生成 2 * 366 * 24 = 17568 个文件(如果需要支持的最大延时时间更短,则生成的文件更少)。
    • 第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引(索引包括消息在schedule log中的offset和size)从磁盘文件加载到内存中的hash wheel上,内存中的hash wheel则是以500ms为一个刻度
  • dispatch log:延时 / 定时消息投递成功后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log 里写入的是消息的 offset,不包含消息 内容。当延时 server 中途重启时,我们需要判断出当前这个刻度 (比如一个小时) 里的消息有哪些已经投递了则不重复投递。

QMQ核心实现

主要实现如下: 算法技巧-时间轮(二) 大体流程图: 算法技巧-时间轮(二)

  1. Delay Server 中包含的几个周期定时任务
  • messageLogFlushService:负责 delay server 接受消息后,将 messagelog 刷盘
  • dispatchLogFlushService:delay message 到期发送后,写 offset 到 dispatchlog,其主要负责将 dispatchlog 刷盘
  • iterateOffsetFlushService:主要负责回放 messagelog,并管理回放进度,进度保存在 message_log_iterate_checkpoint.json
  1. WheelTickManager 主要工作
  • start timer: 初始化并启动时间轮
  • recover:根据 dispatchlog 和 回放进度恢复时间轮数据
  • load schedulelog:周期加载 schedulelog 数据来填充时间轮数据
  • 监听 messagelog 的回放事件,回放添加 schedulelog 的时候判断 (改延时消息是否属于当前延迟刻度,eg. 1h 内) 是否需要将其添加到时间轮中
  1. HashedWheelTimer 时间轮实现 时间轮 和 Dubbo 大体是一致的,都是先将任务添加到 Queue timeouts 中,然后周期从这个列表中获取 100000 个来添加到 HashedWheel 中对应的 HashedWheelBucket 中。

总结

时间轮是一种高效的延时队列,可以参考上述业界实现来应用到实际场景中。

参考 1.RocketMQ 消息集成:多类型业务消息——定时消息 2.19 TimingWheel:探究Kafka定时器背后的高效时间轮算法 3.透彻理解Kafka(十)——时间轮调度 4.延时消息常见实现方案 5.深入 RocketMQ- 消息原理篇 6.延时任务一锅端