经典回顾 - 时间轮算法【HashedWheelTimer】
写在前面
我在分析Redisson的底层实现的过程中,发现时间轮算法工具类出现的非常频繁。这个类的源码不多,只有不到一千行,但是涉及的知识点比较密集,后面分为三个章节介绍一下时间轮工具类的起源,介绍、使用和具体的源码拆分。
分析
几种定时器算法的比较
时间轮工具是一个定时任务调度器,作者专门写了个ppt来介绍为什么要开发这么个工具类。 这是下面几种时间轮算法比较的几个维度 客户端调用: START_TIMER - 开始任务,客户端将任务放入时间轮 STOP_TIMER - 停止任务,客户端查找任务,并发出中断指令 计时器调用: PER_TICK_BOOKKEEPING - 未过期任务保持 EXPIRY_PROCESSING - 已过期任务处理 性能指标 空间:数据结构使用的内存 延迟:处理上述功能所需的时间
列表型
可以维持绝对的到期时间或者时间间隔 作者在这里有一点疏漏,其实还可以区分一下有序列表和无序列表,有序列表的START_TIMER为O(N),因为每次放入任务时,需要做一个排序操作;无序列表的PER_TICK_BOOKKEEPING为O(N),因为要找到过期时间最短的一个元素,需要遍历整个列表。
树型
可以维持绝对到期时间,如果间隔时间相等,则会退化为列表型定时器。 从图中可以看出这是一个有序树,放入任务时,需要遍历,判断应该处于哪一层,所以时间复杂度为O(log(n)),删除操作,我觉得应该也是O(log(n)),因为有遍历查找的过程。PER_TICK_BOOKKEEPING操作是O(1),因为只需要处理叶子节点即可,所有叶子节点都是临近过期的任务。
简单时间轮
- 维持一个大的时间轮;
- 指针每过一个时间单位,会移动一个位置,就像时钟的秒针一样;
- 如果计时器间隔在当前光标位置的旋转范围内,则将计时器放置在相应的位置;
- 需要指数级的内存;
优势: 客户端和执行器的相关操作时间复杂度都为O(1); 弊端: 需要消耗大量内存,因为每个时间bucket都需要保存,比如所有1秒后过期的任务都放到bucket[1]的链表里面,所有2秒后过期的任务都放到bucket[2]的链表里面,执行的时候只需要找到对应的bucket就行,无须遍历,设计也简单。但是如果有一个2^32秒后过期的任务,那么要放到bucket[2^32]里面,需要在内存中维护一个超级大的hash表,造成空间上的极大浪费。
哈希时间轮
这是netty最终选取的方案,后续会着重介绍。
假设轮盘有8个bucket,进来一个任务,其哈希值为17,把任务放到bucket[1]的链表中,元素里保存其轮数【2】。当轮盘转过2圈,又过了一个bucket的时候,任务到期,将之取出执行。如果链表中后续的任务轮数大于零【还需要继续等待】,则更新轮数,并重新保存到链表中。 时间复杂度需要根据具体的实现来分析:
- 有序链表 a. 弊端:客户端放入任务,如果链表长度小于轮盘bucket数量,则平局时间复杂度为O(1);否则极端情况下,会出现时间复杂度为O(n)的情况。 b. 优势:执行器获取任务的时间复杂度为O(1)
- 无序链表 a. 弊端:执行器获取任务,如果链表长度小于轮盘bucket数量,则平局时间复杂度为O(1);否则极端情况下,会出现时间复杂度为O(n)的情况。 b. 优势:客户端放入任务的时间复杂度为O(1)
层次时间轮
按照任务延迟时间等级分为多个时间轮 这是简单时间轮的变体,将数据多层存放,避免空间浪费。比如设计一个以天为单位的时间轮,将之分为三层,类似于钟表,小时级【hour轮,24个bucket】 -> 分钟级【minute轮,60个bucket】 -> 秒级【second轮,60个bucket】。
- 新增任务 新增任务时,计算其相对过期时间,比如一个deadline为2h39m15s的任务,会存放到bucket[2]里面的链表中。
- 迁移任务 每隔一秒轮询一次这个多层时间轮,当轮盘时间过去2h之后,将这个任务迁移到下级轮盘【minute轮】的bucket【39】后的链表中。轮盘时间再过去39m的时候,将之迁移到【second轮盘】的bucket【15】中。
- 执行任务 当遍历最下一层轮盘时,遍历到有到期的任务,发现其无法继续往下迁移,则取出后执行。
Netty时间轮简介
netty的hash时间轮工具类是HashedWheelTimer,实现了Timer接口【定时任务执行器】。Timer接口比较简单,只有两个方法:newTimeout() - 向时间轮中添加一个任务,并返回Timeout任务实例;stop() - 停止时间轮,返回还未执行的定时任务。里面有三个内部类:Worker、HashedWheelTimeout、HashedWheelBucket。
- Worker Worker是一个Runnable实现类,是Timer的工作引擎,主要有两个功能:执行任务和轮询。Worker类中有一个final Set集合【unprocessedTimeouts】,用于存放未处理的定时任务,在调用Timer.stop()方法停止时,返回给调用者。
- HashedWheelTimeout 实现的是Netty中的Timeout接口,接口中有三类信息:Timer实例【创建Timeout的Timer】、TimerTask实例、任务的状态【取消任务,是否取消,是否过期】
- HashedWheelBucket bucket概念在上一章中已经做过介绍,是时间轮的一个刻度,在bucket里面会维护一个链表,用来保存产生hash冲突的任务。新建时间轮时,如果初始化的bucket数量比较少,hash冲突是不可避免的。
- TimeTask 此接口没有显式实现类,每次调用newTimeout时,隐式创建一个实例。其中只有一个run方法,在执行时调用。
结构示意图
功能介绍
- 新增任务 在Timer实例中,调用newTimeout方法添加任务,新增的任务第一时间会添加到等待列表【timeouts】,上图中黄色的列表。
- 执行任务 每过一个时钟周期,就遍历一次bucket数组,获取特定的bucket,判断是否有过期的任务,如果有,则执行,否则Worker睡眠等待进入下个时钟周期;
- 取消任务 Timeout实例有cancel方法,调用之后,如果定时任务还是初始化状态,则将之从bucket中移除,并加入【cancelledTimeouts】列表;
如何使用
新建实例时,有六个参数:
- threadFactory 这个参数主要是给worker线程来命名,需要的话,可以自定义,一般默认即可。
- tickDuration 时钟周期,遍历时间轮的时间间隔,默认是100毫秒
- unit 上一个参数的时间单位
- ticksPerWheel 时间轮中有多少个bucket,默认是512,linux中的哈希时间轮默认也是512个bucket。bucket越少,越容易发生hash冲突。
- leakDetection 是否检测内存泄露
- maxPendingTimeouts 最大等待执行的任务数量,在Timer中有一个pendingTimeouts原子变量,新增任务时递增,移除任务时递减,当pendingTimeouts大于maxPendingTimeouts时,就会报错。
// 新建一个哈希时间轮实例
HashedWheelTimer timer = new HashedWheelTimer(Executors.defaultThreadFactory(),
100,TimeUnit.MILLISECONDS,10,false,1000);
// 添加一个定时任务
timer.newTimeout(timeout -> {
log.info("--------------- time task run,111111");
// 这个操作会报错,因为一直循环递归自己,会栈溢出
// timeout.task().run(timeout);
}, 10, TimeUnit.SECONDS);
// 添加第二个定时任务
timer.newTimeout(timeout -> {
log.info("--------------- time task run,22222");
Thread.sleep(10000);
}, 5, TimeUnit.SECONDS);
log.info("=== 等待的任务:{}",timer.pendingTimeouts());
上面这个例子中,我们设定了maxPendingTimeouts为1000,如果设置为1,就会报错,因为代码里添加了两个任务,而且都是需要等待一段时间的。
java.util.concurrent.RejectedExecutionException: Number of pending timeouts (2) is greater than or equal to maximum allowed pending timeouts (1)
深入源码
前面两个章节介绍了哈希时间轮的来源、功能介绍和使用方式,这里最后详细研究一下netty中时间轮算法的源码【HashedWheelTimer】,源码行数不多,只有800多行,对比HashMap的2300多行,已经算是想当精简了。后面主要从这几个方面:新建Timer、新建任务、遍历任务、任务过期执行。 注意这几个变量
- startTime:工作引擎【Worker】的启动时间,只赋值一次,不做更新。后续所有的相对时间,都是以此为参照;
- deadline:任务的到期时间,是一个相对时间,用于计算执行倒计时:remainingRounds。
- remainingRounds:timeout中的一个参数,任务在时间轮中的倒计时。表示剩余的轮数,每次轮到当前bucket执行的时候,此数值减一;
- tick:时钟周期的次数,tickDuration是每次时钟周期的长度,也是时间轮的精度,比如手表的精度是秒。如果tickDuration是100ms,则表示精度为100ms,如果两个任务,一个是延迟50ms执行,一个延迟80毫秒执行,在100ms精度的设定里,没有区别,会在同一批次被执行。
在此介绍的Timer,假设其参数为
- tickDuration = 100ms
- ticksPerWheel = 16
- mask = 15
新建Timer
新建Timer的主要流程,工作引擎是单线程,所有的任务都在worker单线程中遍历执行。在同一个JVM中,不能创建超管64个Timer实例,否则会报错。INSTANCE_COUNTER是原子变量,统计Timer实例,当实例数量超过INSTANCE_COUNT_LIMIT【final类型,64】,会输出错误日志:You are creating too many HashedWheelTimer.class instances。
// 将ticksPerWheel转化为2的幂,然后初始化转轮。
wheel = createWheel(ticksPerWheel);
// 注意这个参数mask,记录了轮子的长度,在后续添加任务和遍历任务都需要用到此参数
mask = wheel.length - 1;
// 新建工作引擎线程,worker实现了runnable接口
workerThread = threadFactory.newThread(worker);
// 判断Timer实例数量是否超过限制,如果超过64个,则输出错误日志
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
// 输出错误日志
reportTooManyInstances();
}
新建哈希轮
wheel的长度必须是2的N次方,假设我们创建Timer时,输入的ticksPerWheel为9,内部方法会将其转化为大于9的最相近的一个2^n数字,也就是16。
int normalizedTicksPerWheel = 1;
// 通过计算,找到大于ticksPerWheel的,最相近的一个2^n数字
while (normalizedTicksPerWheel < ticksPerWheel) {
// 左移运算
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
根据上一步算出来的数字,创建wheel
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
添加任务
startTime 的赋值节点是在Wroker启动过程中,如果Worker不启动,则任务都不能添加,因为任务的deadline是根据Worker的启动时间计算出来的一个相对值。 任务的deadline是一个相对时间,假设添加的定时任务是10秒之后执行,添加时Worker已经运行了30秒,那么deadline计算方式如下: deadline = (System.nanoTime() - startTime) + unit.toNanos(delay) = (30 + 10) * 10^9 = 40 * 10^9 纳秒
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 计算等待执行任务的数量
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 等待中任务数量有限制,在创建Timer时设定
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 启动工作线程,会对startTime变量赋值,下一步会用到;后续在工作引擎的整个生命周期,都会用到这个变量,是工作引擎【Worke】的生日。
start();
// 根据Worker的生日,来计算一个相对时间,deadline通俗的解释:在worker几岁的时候去处理这个任务
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 防止溢出,如果delay足够大的情况下,会造成long溢出,计算出来的deadline是负数
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 将定时任务封装到Timeout,添加到timeouts集合中,此集合在上一章的图示中有说明,收集等待进入wheel的任务,在worker轮询时,加入指定的bucket
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
启动工作引擎
对的没错,工作引擎是在添加任务的时候启动的,而不是在创建Timer的时候,可以避免线程空转,浪费CPU时钟周期。 这里还有一个知识点,就是为什么要用while循环,而不使用if,因为按照JVM的定义,await之后,线程会被阻塞,直到调用notify/notifyAll方法被唤醒之后,才能继续争抢CPU时间片。问题就在这里,如果用if,可能会被其他线程调用notifyAll之后意外唤醒,导致在Worker没有启动的情况下,去计算了新增任务的deadline,从而出现异常 。
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
// 只有工作线程是初始化状态时,才能启动
case WORKER_STATE_INIT:
// CAS运算,避免线程不安全
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 等待startTime被赋值,前面说过,这是Worker的生日,在生日不确定的情况下,不能给其安排的任务。
while (startTime == 0) {
try {
// startTimeInitialized是一个CountDownLaunch类型变量,在worker的run方法里,会调用countDown()方法,通知这里跳出循环。
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
遍历任务
worker启动以后,在关闭之前的全部活动,都在下面这段循环中。计算任务的执行时间线的方法是:deadline = tickDuration * (tick + 1),注意这里计算时,跳了一个时钟周期。是为了后续遍历bucket时,如果剩余轮数为零,则表示当前任务timeout的deadline必定是小于这个数据的,因为它在区间末尾【一个区间是100毫秒】,所有在此区间内的timeout,都会被执行。
do {
// 计算任务的执行时间线
final long deadline = waitForNextTick();
if (deadline > 0) {
// mask是2^n -1,和tick做按位与操作。假设mask是7,那么idx的值始终是0-7之间循环
int idx = (int) (tick & mask);
// 从cancelledTimeouts中弹出被取消的任务,然后从bucket中删除
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
// 将timeouts集合【等待放入bucket的任务】中的任务,放入到对应的bucket
transferTimeoutsToBuckets();
// bucket中,所有小于deadline的任务,都会被取出执行
bucket.expireTimeouts(deadline);
tick++;
}
// 只要worker是启动状态,则一直循环
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
任务放到对应bucket - transferTimeoutsToBuckets
在do - while循环中,有这么一步操作,将timeouts集合【等待放入bucket的任务】中的任务,放入到对应的bucket。其中怎么找到bucket,是一个相当关键的步骤,这里着重介绍一下。 继续沿用上面添加任务段落所使用的那个例子
- deadline = 40 * 10^9 纳秒
- tickDuration = 100毫秒 = 0.1 * 10^9
- 所以calculated = 40 / 0.1 = 400
- 添加任务时,worker已经执行了30秒,表示当时tick = 30 / 0.1 = 300,如果在当时while循环没有将之加入到bucket,就需要在下一个tick中处理了。这里假设就是这种情况,所以tick = 301
- remainingRounds = (400 - 301) / 16 = 6
- ticks这里有点不明白,ticks = 400
- stopIndex = 400 & 15 = 000110010000 & 1111 = 0
- 最后找到相应的bucket,wheel[0],将这个timeout放到双向链表【HashedWheelBucket】的末尾
假设有两个任务,A【calculated=400】,B【calculated=410】,具有相同的remainingRounds【16】,stopIndex不同,A【stopIndex=0】,B【stopIndex=10】
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 这一句没有看懂。原注释:Ensure we don't schedule for past.
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
任务过期(执行)
在worker的while循环中,有这么一行:bucket.expireTimeouts(deadline),这里就会执行过期的定时任务。
public void expireTimeouts(long deadline) {
// 从头到尾进行遍历,因为数据是无序存放的,所以需要判断每个节点
HashedWheelTimeout timeout = head;
// 处理bucket中所有的定时任务
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// 如果剩余轮数为零,则表示当前任务已经到了执行时间
if (timeout.remainingRounds <= 0) {
// 从bucket中移除此任务
next = remove(timeout);
// 执行任务
timeout.expire();
} else if (timeout.isCancelled()) {
// 如果任务已经取消,则移除
next = remove(timeout);
} else {
// 如果还没到过期时间,则剩余轮数减一,继续等待
timeout.remainingRounds --;
}
timeout = next;
}
}
timeout.expire()是具体的执行方法,比较简单。
// 用CAS来设定任务的状态
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
// 如果状态设定成功,则直接执行,用worker线程执行。
task.run(this);
转载自:https://juejin.cn/post/7237290911222677562