HashedWheelTimer详解(Netty实现的简单时间轮)
HashedWheelTimer初识
什么是时间轮
一些基本术语:
时间轮是一种实现延迟功能的算法。
时间轮的定义是指一种用于高效调度任务的模型,它通过将时间分割成一系列固定大小的槽(称为"时间格"),每个槽存放着相应时间点到期的任务。随着真实时间的流逝,时间轮的指针会按顺序移动到下一个时间格,从而触发并执行到期的任务。这种设计可以高效地利用线程资源进行批量化调度,尤其适用于处理大规模的延时或周期性定时任务。
构成时间轮的基本要素主要包括:
- 时间格:时间轮上分割出的固定时间间隔的槽位,用于存放将要执行的任务。
- 轮询线程:负责根据时间轮当前所指的时间格来选取并执行相应的任务。
- 时间跨度:单个时间格代表的实际时间长度,例如在Kafka中可能设置为1毫秒。
- 环形数组:时间轮底层的数据结构,用于表示时间轮的各个时间格。
使用示例
public class HasedWheelTimerDemo {
public static void main(String[] args) throws InterruptedException {
// HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 8, false, -1L);
// 完整示例
// 1. 创建时间轮,可以使用默认构造方法创建时间轮,默认时间轮以100ms为单位,共512个槽位。
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
System.out.println("程序启动时间==" + new Date());
// 2. 新建定时任务1 ,延时两秒后执行
Timeout timeout1 = hashedWheelTimer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("任务运行时间==" + new Date());
}
}, 2000, TimeUnit.MILLISECONDS);
// 2. 新建定时任务2 ,延时两秒后执行
Timeout timeout2 = hashedWheelTimer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("任务运行时间2==" + new Date());
}
}, 2000, TimeUnit.MILLISECONDS);
// 3. 取消定时任务2
timeout2.cancel();
// 4. 查看定时任务是否过期
System.out.println("timeout1.isCancelled() = " + timeout1.isCancelled());
System.out.println("timeout2.isCancelled() = " + timeout2.isCancelled());
Thread.sleep(Integer.MAX_VALUE);
// 5. 停止时间轮。(一般也不会停,只有在服务停止的时候才会停止时间轮)
hashedWheelTimer.stop();
}
}
上面的示例中,
- 创建了一个时间轮,(基本时间跨度默认100ms,时间单位个数默认512)。
- 给时间轮中添加了两个定时任务timeout1和timeout2,定时任务内容都是两秒后控制台输出文字。添加定时任务的逻辑中会默认启动时间轮
- 取消定时任务timeout2
- 查看定时任务状态
- 关闭时间轮
输出结果
程序启动时间==Tue Mar 05 10:10:46 CST 2024
timeout1.isCancelled() = false
timeout2.isCancelled() = true
任务运行时间==Tue Mar 05 10:10:48 CST 2024
适用场景
- 适用于不需要精准时间的定时任务。
- 适用于短平快的定时任务
常见场景:
- 连接超时监控
- 心跳检测
- Redis 锁续期
也就是说,假如我有一个定时任务执行时间很长,(怎么定义这个很长呢?只要超过了时间轮的 Tick 时间,肯定就算长的了。)那他就不适合使用 HashedWheelTimer。否则会影响时间轮中其他任务的执行。只适合那些业务逻辑简单,执行速度快的“轻量级”的定时任务。
和JDK的Timer以及ShecduleThreadPoolExecutor比较
HashedWheelTimer | Timer | ShecduleThreadPoolExecutor | |
---|---|---|---|
线程 | 单线程 | 单线程 | 多线程 |
任务异常处理 | 不影响时间轮线程,会打印出异常信息 | 中断Timer线程 | 和线程池处理一致 |
添加/删除任务速度 | O(1),采用时间换空间 | O(logn) | O(logn) |
任务追赶 | 会做任务追赶 | 会做任务追赶 | 会做任务追赶 |
时间轮还有一些优点:
- 时间粒度可控
- 性能很好
时间轮也有一些缺点:
- 执行时间不精确。时间格的时间跨度越大,任务执行就越不精确。
- 定时任务只能执行一次,并没有固定频率执行定时任务的API,需要自己手动实现。
HashedWheelTimer源码分析
创建时间轮
创建时间轮原理
时间轮有很多构造函数,最后都调用重载的构造函数鸡其参数解释如下:
/**
* threadFactory : 创建线程池的工厂,默认是Executors
* tickDuration : 时间轮的时间格大小,默认100ms
* unit : 时间轮的时间格时间单位,默认100ms
* ticksPerWheel : 时间轮环形数组的大小,默认512
* leakDetection : 内存泄露检测
* maxPendingTimeouts : 每个时间格称作Bucket,每个Bucket能够存储的任务数量上限,默认值为-1代表无上限
* taskExecutor : 任务执行的线程池,默认是netty重写的一个单线程。也可以自己指定线程池。
*/
public HashedWheelTimer(ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor) {}
构造函数中都做了什么呢?源码如下
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor) {
// 省略非核心代码
// 1. 校验参数
checkNotNull(....)
// 2. 创建时间轮,分配时间单位跨度,时间轮的时间单位个数
wheel = createWheel(ticksPerWheel);
// 3. 为时间轮单独创建一个线程。(只是创建线程,并未启动)
workerThread = threadFactory.newThread(worker);
}
构造函数中一共做了这么几件事
- 校验7个参数的合法性
- 创建时间轮。在HashedWheelTimer中,时间轮的底层数据结构就是这个wheel字段。它的定义是一个HashedWheelBucket类型的数组。
- 创建启动时间轮的线程,但但并没有调用Thread的start方法。
着重看下第二步创建时间轮和第三步时间轮的线程
创建时间轮的数据结构
时间轮的数据结构在HashedWheelTimer定义如下
private final HashedWheelBucket[] wheel;
本质上就是一个HashedWheelBucket类型的数组。。我们来看下createWheel方法如何为其赋值的。
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
// 检查时间格环形数组的长度,最大2^30
checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
// 长度调整为2的倍数
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
// 时间格环形数组赋初值
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
- 每个bucket最大容量是2的30次方
- 时间轮的长度改为2的整数倍,不够的补足。
- 给wheel数组赋初值,也就是new操作。
创建运行时间轮的线程
在HashedWheelTimer中运行时间轮的线程是workerThread字段它的定义如下
private final Thread workerThread;
在构造函数中给它赋初值
workerThread = threadFactory.newThread(worker);
这里的threadFactory默认是JUC中Executors的默认线程工厂,重点是这个worker是什么呢?worker是HashedWheelTimer中的一个属性。worker字段定义如下
private final Worker worker = new Worker();
原来它是一个类,并且是HashedWheelTimer的内部类。这个类实现了Runnable接口。所以本质上workerThread就是一个Thread类型的变量,并且任务是由Worker类重写的run方法定义的。来看一下Worker类中是怎么重写run方法的
private final class Worker implements Runnable {
@Override
public void run() {
do {
// 一直到下一次任务的执行时间
final long deadline = waitForNextTick();
if (deadline > 0) {
// 获取下一次环形数组的索引下标
int idx = (int) (tick & mask);
processCancelledTasks();
// 得到bucket
HashedWheelBucket bucket = wheel[idx];
transferTimeoutsToBuckets();
// 执行任务
bucket.expireTimeouts(deadline);
tick++;
}
}
// 只要workThread还在运行,就一直循环读取任务执行。这里的WORKER_STATE_UPDATER是HashedWheelTimer内部维护的一个状态,可以通过HashedWheelTimer的stop方法来停止时间轮改变这个变量,这样这个循环就会满足退出条件了。
while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}
}
时间轮何时启动
两种途径可以启动时间轮
- 通过HashedWheelTimer的**start()**方法手动启动时间轮线程(不常用)
- 通过HashedWheelTimer的newTimeout方法添加任务时,底层会自动调用**start()**方法启动时间轮线程
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 检查参数
// 启动时间轮线程
start();
// ......
// 把任务包装秤一个 HashedWheelTimeout 添加到 timeouts 中。
// timeouts是netty自定义的队列,它的定义是:private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 把Timeout添加到队列后,workerThread线程就会轮询这个队列,执行定时任务。
timeouts.add(timeout);
return timeout;
}·
时间轮的线程都做了什么
前面说到创建运行时间轮的线程,简单介绍了下,Worker实现了Runnable接口
private final class Worker implements Runnable {
@Override
public void run() {
do {
// 一直到下一次任务的执行时间
final long deadline = waitForNextTick();
if (deadline > 0) {
// 获取下一次环形数组的索引下标
int idx = (int) (tick & mask);
processCancelledTasks();
// 得到bucket
HashedWheelBucket bucket = wheel[idx];
transferTimeoutsToBuckets();
// 执行任务
bucket.expireTimeouts(deadline);
tick++;
}
}
while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}
}
run方法中做的几件事:
- 计算下一次定时任务执行时间
- 处理已经被取消的任务
- 从环形数组中获取Bucket
- 从timeouts队列中把满足条件的timeout放到这个Bucket中。但是每次只会循环队列中的前100000个
- 执行Bucket中的所有任务
下面逐步介绍这5步内容
获取下一下定时任务的执行时间
waitForNextTick方法的返回值:当前时间的相对时间,这个时间之前的时间格的定时任务都会被执行。具体的详细步骤写在
private long waitForNextTick() {
// 获取下一次执行任务所在的时间格的右区间的时间,比如,第一次执行,tick是0,tickDuration 是100ms,则第一个时间格的过期时间就是当前时间100ms后
long deadline = tickDuration * (tick + 1);
// 不断循环
for (;;) {
// 时间轮采用的是相对时间,时间轮线程启动的那一刻是startTime
final long currentTime = System.nanoTime() - startTime;
// 需要睡眠的时间=(下一个时间格右区间时间-当前时间+999999)/1000000
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
// 走这里,说明当前时间指针已经越过了当前时间格。达到了触发当前时间格的定时任务的条件。
if (currentTime == Long.MIN_VALUE) {
// 当前时间是负数则返回(一般不会是这种情况
return -Long.MAX_VALUE;
} else {
// 返回当前时刻时间(注意这是相对时间)
return currentTime;
}
}
try {
// 走到这里,说明时间格所在的最大时间还没到,不满足触发定时任务的条件,所以需要线程睡一会。
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
从队列中删除所有已经被取消的任务
private void processCancelledTasks() {
// 循环cancelledTimeouts队列,这个队列中全部是被取消的任务。
for (;;) {
// 获取队列中的任务
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
break;
}
// 从队列中删除任务
timeout.remove();
}
}
从环形数组中获取Bucket
每循环一个时间格,tick就会+1,比如时间轮一共有8个时间格,每个时间格1s,那么环形数组就是8个,mask = 8-1=7个,假如一个定时任务是9s后执行,tick=(9-1)/1s=8,tick&mask=1,所以时间格应该取第二个
把定时任务timeout添加到时间格的Bucket中
private void transferTimeoutsToBuckets() {
// 每次循环10w个
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
// ...
// 计算执行轮次,是第几轮执行
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
// 把定时任务timeout添加到时间格的Bucket中
bucket.addTimeout(timeout);
}
}
执行已经满足条件的定时任务
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
// 定时任务已到期
if (timeout.deadline <= deadline) {
// 把定时任务标记为已执行,并执行定时任务
timeout.expire();
}
} else if (timeout.isCancelled()) {
// 如果任务已取消,删除任务
next = remove(timeout);
} else {
// 如果剩余轮次大于0,则-1
timeout.remainingRounds --;
}
// 下一个
timeout = next;
}
}
转载自:https://juejin.cn/post/7343131959852351538