likes
comments
collection
share

Java 定时器 Timer 原理解析

作者站长头像
站长
· 阅读数 6

java.util.Timer 是 Java 中的一个实用类,它可以用来安排在未来某个时间执行的任务,或者定期执行任务。它内部包含一个任务队列,用于存储要执行的 TimerTask。通过 schedule 或 scheduleAtFixedRate 方法安排任务。

替代方案

Java 5.0 引入了 J.U.C. ,其中一个并发工具是 ScheduledThreadPoolExecutor ,它是一个线程池,用于以给定的速率或延迟重复执行任务。它实际上是 Timer/TimerTask 组合的更通用的替代品,因为它允许多个服务线程,接受各种时间单位,并且不需要继承 TimerTask (只需实现 Runnable )。

使用一个线程配置 ScheduledThreadPoolExecutor 效果等同于 Timer 。

注意事项

1. 最好不要在 TimerTask 中做耗时操作

Timer 内部会关联一个单独的线程,并且自己会有一个按照下次执行时间排序的任务队列,线程会按序执行队列中的任务。每个任务不应该产生耗时操作,因为如果某一个任务花费了大量时间运行,会占用当前后台线程资源,导致后续的任务发生延迟。这些任务可能堆到最后,并在发生延迟的任务执行完成后,快速连续执行。

2. 最好主动调用 cancel 释放 Timer 的线程资源

当 Timer 对象执行完所有的任务后,并且没有了实时引用,Timer 关联的线程才会终止(并成为垃圾收集的对象)。然而,这可能需要很长的时间才能发生。默认情况下,任务执行线程不作为守护进程线程运行,因此它能够防止应用程序终止。如果调用者想要快速终止计时器的任务执行线程,调用者应该调用计时器的 cancel 方法。

3. TimerThread 意外终止,会抛出 IllegalStateException

如果 Timer 的 TimerThread 意外终止,例如,因为调用了它的 stop 方法,任何进一步尝试调度计时器上的任务都将导致 IllegalStateException,就像调用了计时器的 cancel 方法一样。

4. 线程安全及实时保证

Timer 是线程安全的,多个线程可以共享一个 Timer 对象,而不需要外部同步。Timer 并不提供实时保证,因为它是通过 Object.wait(long) 调度任务的。

5. 并发及时间复杂度

这个类可以扩展到大量的并发计划任务(数千个应该没有问题)。在内部,它使用 **二进制堆 **来表示它的任务队列,因此调度任务的时间复杂度是 O(log n),其中 n 是并发调度任务的数量。

使用方法

private fun startTimer() {
    var timeTask = object : TimerTask() {
        override fun run() {
            // do somethings
        }
    }
    timer = Timer()
    // 开始执行
    timer.schedule(task, DELAY_TIME, PERIOD_TIME)
    // 取消执行
    timer.cancel()
}

使用 Timer 有四个步骤:

  1. 构造一个 Timer 执行的任务
  2. 创建一个 Timer 对象
  3. 调用 Timer#schedule 系列方法,设置好执行的任务、延迟时间和重复执行的周期间隔时间
  4. 在适当的时机调用 cancel 方法,取消定时器。

运行流程

Timer 可以按计划执行重复的任务或者定时执行指定任务,这是因为 Timer 内部利用了一个后台线程 TimerThread 有计划地执行指定任务。Timer 内部有一个任务队列,它可以按照时间先后顺序执行任务,而 TimerTask 就是一个任务。TimerTask 是一个抽象类,它的子类代表一个可以被 Timer 计划的任务。TimerTask 中有一个 run 方法,当 Timer 计划的时间到达时,Timer 就会执行 TimerTask 的 run 方法。TimerTask 的执行是在 Timer 的内部线程中执行的,因此 TimerTask 不应该执行一个耗时的操作,否则会影响 Timer 的精度。TimerTask 的执行时间不能超过 Timer 的周期,否则 Timer 会抛出异常。Timer 内部有一个 TimerQueue,它是一个优先级队列,用于存放 TimerTask。TimerQueue 中的 TimerTask 按照执行时间排序,执行时间早的 TimerTask 优先级高。

他们的关系如下:

  • Timer 类负责安排任务的执行。它包含一个任务队列,该队列保存要执行的 TimerTask。
  • TimerTask 是一个抽象类,它实现了 Runnable 接口。您需要创建一个继承自 TimerTask 的类,并重写 run 方法来定义要执行的任务。
  • 当您使用 Timer 的 schedule 或 scheduleAtFixedRate 方法安排任务时,这些任务将被添加到 TimerQueue 中。
  • TimerThread 是一个内部类,它继承自 Thread 类。它负责从队列中获取任务并执行它们。

TimerTask

public abstract class TimerTask implements Runnable {
    // 定时器任务要执行的动作
    public abstract void run();
}

TimerTask 是一个抽象类,它实现了 Runnable 接口,使用者需要创建一个继承自 TimerTask 的类,并重写 run 方法来定义要执行的任务。

TimerTask 的状态

TimerTask 内部定义了当前任务的状态:

public abstract class TimerTask implements Runnable {
		// 此任务的状态,从下面的常量中选择
    int state = VIRGIN;

    // 这个任务还没有被安排
    static final int VIRGIN = 0;

    // 计划执行该任务。如果它是非重复任务,则表示尚未执行
    static final int SCHEDULED   = 1;

    // 此非重复任务已执行(或当前正在执行),且未被取消。
    static final int EXECUTED    = 2;

    // 这个任务已经被取消(调用TimerTask.cancel)
    static final int CANCELLED   = 3;
  
  	// other codes
}

TimerTask 通过 state 保存当前任务的执行状态,后续会根据这个属性来进行状态判断。

对于时间控制,有两个成员属性:

// 假设该任务计划执行,此任务的下一次执行时间,由 System.currentTimeMillis 返回,
// 对于重复任务,此字段将在每个任务执行之前更新。
long nextExecutionTime;

// 周期(以毫秒为单位)用于重复任务。正值表示固定速率执行。负值表示固定延迟执行。
// 0表示非重复任务。
long period = 0;

计算下次执行时间

public long scheduledExecutionTime() {
    synchronized(lock) {
        return (period < 0 ? nextExecutionTime + period
                           : nextExecutionTime - period);
    }
}

任务的执行时间是根据下一次执行时间 + 任务周期 来进行的,如果 period 小于 0,表示这个固定延迟的任务,虽然无法解释固定延迟的含义,但是这个方法通常不与固定延迟执行重复任务一起使用,因为它们的计划执行时间允许随时间漂移,因此不是特别重要。

该方法返回此任务最近实际执行的安排执行时间。在这段代码中,首先对 lock 对象进行同步,以确保线程安全。然后,根据 period 的值计算并返回下一次执行时间。

  • 如果 period 小于 0,则返回 nextExecutionTime + period;
  • 否则,返回 nextExecutionTime - period。

取消

public boolean cancel() {
    synchronized(lock) {
        boolean result = (state == SCHEDULED);
        state = CANCELLED;
        return result;
    }
}

这段代码是 TimerTask 类中的 cancel 方法的实现。该方法用于取消此计时器任务。如果任务已安排但尚未执行,或者已安排重复执行,则取消后不再运行。如果任务已经执行或者因其他原因无法取消,则此方法返回 false;否则返回 true。

在这段代码中,首先对 lock 对象进行同步,以确保线程安全。然后,检查当前状态是否为 SCHEDULED,并将其保存在 result 变量中。接下来,将 state 设置为 CANCELLED,表示任务已被取消。最后,返回 result 变量的值。

cancel 方法可能会被重复调用;第二次和后续的调用将会没有任何效果。

TaskQueue

TaskQueue 类表示一个计时器任务队列,它是一个按照 nextExecutionTime 排序的 TimerTask 优先队列。每个 Timer 对象都有一个这样的队列,并与其 TimerThread 共享。

在内部,TaskQueue 类使用堆来实现,这为 add、removeMin 和 rescheduleMin 操作提供了 log(n) 的性能,并为 getMin 操作提供了常数时间性能。

数据结构

TaskQueue 类使用一个平衡二叉堆来表示优先队列,通过数组实现。在这个堆中,queue[n] 的两个子节点分别是 queue[2n] 和 queue[2n+1]。

优先队列按照 nextExecutionTime 字段排序:具有最小 nextExecutionTime 的 TimerTask 位于 queue[1](假设队列非空)。对于堆中的每个节点 n,以及 n 的每个后代 d,都有 n.nextExecutionTime <= d.nextExecutionTime。

private TimerTask[] queue = new TimerTask[128];

添加操作

void add(TimerTask task) {
    // Grow backing store if necessary
    if (size + 1 == queue.length)
        queue = Arrays.copyOf(queue, 2*queue.length);

    queue[++size] = task;
    fixUp(size);
}

该方法用于向 TaskQueue 中添加新任务。

在这段代码中,首先检查当前队列的大小是否已满。如果 size + 1 等于 queue.length,则表示队列已满,需要扩容。此时,使用 Arrays.copyOf 方法将 queue 数组的长度扩大一倍。

接下来,将新任务添加到队列中,并调用 fixUp 方法对队列进行调整,以确保队列中的任务按照下一次执行时间排序。

堆排序算法

fixUp
private void fixUp(int k) {
    while (k > 1) {
        int j = k >> 1;
        if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
            break;
        TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
        k = j;
    }
}

这个方法是在将插入元素根据 nextExecutionTime 进行排序,每次右移一位,相当于除以 2 ,如果 k 位置的任务下一次执行时间大于 k / 2 任务的执行时间,就将数组在两者位置进行互换,然后以新位置 k / 2 作为输入进行下一次比较,

该方法用于在向 TaskQueue 中添加新任务时,对队列进行调整,以确保队列中的任务按照下一次执行时间排序。

在这段代码中,首先定义一个 while 循环,循环条件为 k > 1。然后,在循环内部,计算 j = k >> 1(即 j 等于 k 除以 2)。接下来,检查 queue[j]nextExecutionTime 是否小于等于 queue[k]nextExecutionTime。如果是,则退出循环;否则,交换 queue[j]queue[k] 的位置,并将 k 设置为 j。最后,在下一次迭代中继续执行上述步骤。

这个方法通过不断比较和交换元素的位置,来确保队列中的任务按照下一次执行时间排序。

堆排序时间复杂度为 log(n) ,所以 add 方法的时间复杂度也是 log(n) 。

fixDown
private void fixDown(int k) {
    int j;
    while ((j = k << 1) <= size && j > 0) {
        if (j < size &&
            queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
            j++; // j indexes smallest kid
        if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
            break;
        TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
        k = j;
    }
}

类似逻辑的方法是 fixDown ,该方法用于在从 TaskQueue 中删除任务时,对队列进行调整,以确保队列中的任务按照下一次执行时间排序。

在这段代码中,首先定义一个 while 循环,循环条件为 j = k << 1(即 j 等于 k 乘以 2)小于等于 size 并且 j 大于 0。然后,在循环内部,检查 j 是否小于 size,并且 queue[j]nextExecutionTime 是否大于 queue[j+1nextExecutionTime。如果是,则将 j 加 1,使其指向最小的子节点。

接下来,检查 queue[k]nextExecutionTime 是否小于等于 queue[j]nextExecutionTime。如果是,则退出循环;否则,交换 queue[j]queue[k] 的位置,并将 k 设置为 j。最后,在下一次迭代中继续执行上述步骤。

这个方法通过不断比较和交换元素的位置,来确保队列中的任务按照下一次执行时间排序。

heapify
void heapify() {
    for (int i = size/2; i >= 1; i--)
        fixDown(i);
}

该方法用于对整个队列进行调整,以确保队列中的任务按照下一次执行时间排序。

在这段代码中,首先定义一个 for 循环,循环变量 i 从 size/2 开始,每次递减 1,直到 i >= 1。然后,在循环内部,调用 fixDown 方法对第 i 个元素进行调整。

这个方法通过从后往前遍历队列中的元素,并对每个元素进行调整,来确保队列中的任务按照下一次执行时间排序。

读取操作

访问元素系列方法很简单,直接查找数组索引,所以时间复杂度是常量级别:

TimerTask getMin() {
    return queue[1];
}

TimerTask get(int i) {
    return queue[i];
}

删除操作

清理队列的方法是对数组挨个进行置空操作,size 更新为 0:

void clear() {
    // Null out task references to prevent memory leak
    for (int i=1; i<=size; i++)
        queue[i] = null;

    size = 0;
}

移除队列头一个元素,先通过将最后一个元素放到队列第一个位置上,这样达到了清理第一个元素的效果,然后通过 fixDown 方法对第一个元素进行排序,更新到合适的位置上:

void removeMin() {
    queue[1] = queue[size];
    queue[size--] = null;  // Drop extra reference to prevent memory leak
    fixDown(1);
}

另一个方法是 quickRemove, 移除指定位置的元素,并没有进行排序:

void quickRemove(int i) {
    assert i <= size;

    queue[i] = queue[size];
    queue[size--] = null;  // Drop extra ref to prevent memory leak
}

重设首个任务时间

重设第一个任务的执行时间:

void rescheduleMin(long newTime) {
    queue[1].nextExecutionTime = newTime;
    fixDown(1);
}

TimerThread

Timer 类中包含一个名为 TimerThread 的内部类,它实现了计时器的任务执行线程。这个线程负责等待计时器队列上的任务,当任务触发时执行它们,重新安排重复任务,并从队列中删除已取消的任务和已完成的非重复任务。

先来看看它的属性:

boolean newTasksMayBeScheduled = true;

private TaskQueue queue;

TimerThread(TaskQueue queue) {
    this.queue = queue;
}

这里存储的引用不是 Timer 而是 TaskQueue,以使引用图保持无环。否则,Timer 将永远不会被垃圾回收,这个线程也永远不会消失。

执行 run 方法来运行任务:

public void run() {
    try {
        mainLoop();
    } finally {
        // Someone killed this Thread, behave as if Timer cancelled
        synchronized(queue) {
            newTasksMayBeScheduled = false;
            queue.clear();  // 删除过时的引用
        }
    }
}

核心逻辑在 mainLoop 方法中:

private void mainLoop() {
    while (true) {
        try {
            TimerTask task;
            boolean taskFired;
            synchronized(queue) {
                // Wait for queue to become non-empty
                while (queue.isEmpty() && newTasksMayBeScheduled)
                    queue.wait();
                if (queue.isEmpty())
                    break; // Queue is empty and will forever remain; die

                // Queue nonempty; look at first evt and do the right thing
                long currentTime, executionTime;
                task = queue.getMin();
                synchronized(task.lock) {
                      // 移除取消的人物
                    if (task.state == TimerTask.CANCELLED) {
                        queue.removeMin();
                        continue;  // No action required, poll queue again
                    }
                    currentTime = System.currentTimeMillis();
                    executionTime = task.nextExecutionTime;
                    if (taskFired = (executionTime<=currentTime)) {
                        if (task.period == 0) { // 非重复任务,删除 
                            queue.removeMin();
                            task.state = TimerTask.EXECUTED;
                        } else { // Repeating task, reschedule 重复任务,重新执行
                            queue.rescheduleMin(
                              task.period<0 ? currentTime   - task.period
                                            : executionTime + task.period);
                        }
                    }
                }
                if (!taskFired) // 任务尚未启动; 等待
                    queue.wait(executionTime - currentTime);
            }
            if (taskFired)  // 任务已启动; run it, holding no locks
                task.run();
        } catch(InterruptedException e) {}
    }
}

该方法包含一个 while 循环,该循环会一直运行直到队列为空且不再安排新任务。

在循环中,它首先等待队列变为非空,然后检查队列中的第一个任务并执行相应操作。如果任务被取消,则将其从队列中移除并继续轮询队列。如果任务已经到达执行时间,则根据任务是否重复执行相应操作:

  • 对于非重复任务,将其从队列中移除并将其状态设置为已执行;
  • 对于重复任务,重新安排执行时间。如果任务尚未启动,则等待直到达到执行时间。

最后,如果任务已启动,则在不持有任何锁的情况下运行它。

Timer

内部创建任务队列、线程:

@ReachabilitySensitive
private final TaskQueue queue = new TaskQueue();

@ReachabilitySensitive
private final TimerThread thread = new TimerThread(queue);

    // 用于生成线程名
private final static AtomicInteger nextSerialNumber = new AtomicInteger(0);

private static int serialNumber() {
    return nextSerialNumber.getAndIncrement();
}

垃圾回收时结束线程

Timer 类中的 threadReaper 属性主要是为了当 Timer 类没有引用时,并且没有 task 在队列时,这时让线程 thread 优雅地结束。这里主要是重写了 Object 的 finalize 方法,在 GC 的时候进行调用,然后让线程结束。

private final Object threadReaper = new Object() {
    protected void finalize() throws Throwable {
        synchronized(queue) {
            thread.newTasksMayBeScheduled = false;
            queue.notify(); // In case queue is empty.
        }
    }
};

构造方法

public Timer() {
    this("Timer-" + serialNumber());
}
    
// isDaemon 表示关联的线程是否应该作为守护进程运行。
public Timer(boolean isDaemon) {
    this("Timer-" + serialNumber(), isDaemon);
}

public Timer(String name) {
    thread.setName(name);
    thread.start();
}

public Timer(String name, boolean isDaemon) {
    thread.setName(name);
    thread.setDaemon(isDaemon);
    thread.start();
}

构造方法中存在 isDaemon 参数,当线程设置为守护线程时,Java虚拟机在当唯一运行的线程都是守护线程时才会退出。

这里的参数都是设置给 TimerThread 的,并且调用了 start 方法开始启动线程执行任务。

schedule 系列方法

上面这组方法对时间间隔要求比较高,适用于需要“流畅性”的重复活动。换句话说,它适用于在短期内保持频率准确比在长期内保持频率准确更重要的活动。这包括大多数动画任务,如定期闪烁光标。它还包括在响应人工输入时执行常规活动的任务,例如只要按下键就自动重复一个字符。

在固定延迟执行中,**每次执行都是相对于前一次执行的实际执行时间进行安排的。如果某次执行因任何原因(如垃圾回收或其他后台活动)而被延迟,则后续执行也将被延迟。**从长远来看,执行频率通常会略低于指定周期的倒数(假设底层Object.wait(long)的系统时钟准确)。

// 发布一定延迟时间后执行的任务
public void schedule(TimerTask task, long delay) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    sched(task, System.currentTimeMillis()+delay, 0);
}

// 发布指定日期执行的任务
public void schedule(TimerTask task, Date time) {
    sched(task, time.getTime(), 0);
}

// 发布一定延迟时间后,随后以周期 period 执行任务,每次执行都是相对于前一次执行的实际执行时间来安排的。
public void schedule(TimerTask task, long delay, long period) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, System.currentTimeMillis()+delay, -period);
}

// 发布指定日期后首次执行任务,随后以周期 period 执行任务。
public void schedule(TimerTask task, Date firstTime, long period) {
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, firstTime.getTime(), -period);
}

在固定速率执行中,**每次执行都是相对于初始执行的预定执行时间进行安排的。如果某次执行因任何原因(如垃圾回收或其他后台活动)而被延迟,则两次或多次执行将快速连续进行以“追赶”。**从长远来看,执行频率将恰好为指定周期的倒数(假设底层 Object.wait(long) 的系统时钟准确)。

固定速率执行适用于对绝对时间敏感的重复活动,例如每小时整点敲响钟声,或者每天在特定时间运行计划维护。它也适用于需要在固定时间内完成固定数量次数的重复活动,例如倒计时器每秒钟滴答一次,持续十秒钟。最后,固定速率执行适用于安排多个必须彼此同步的重复计时器任务。

public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, System.currentTimeMillis()+delay, period);
}

public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) {
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, firstTime.getTime(), period);
}
sched

这个方法是所有 schedule 系列方法的核心实现:

private void sched(TimerTask task, long time, long period) {
    if (time < 0)
        throw new IllegalArgumentException("Illegal execution time.");

    // Constrain value of period sufficiently to prevent numeric
    // overflow while still being effectively infinitely large.
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;

    synchronized(queue) {
        if (!thread.newTasksMayBeScheduled)
            throw new IllegalStateException("Timer already cancelled.");

        synchronized(task.lock) {
            if (task.state != TimerTask.VIRGIN)
                throw new IllegalStateException(
                    "Task already scheduled or cancelled");
            task.nextExecutionTime = time;
            task.period = period;
            task.state = TimerTask.SCHEDULED;
        }

        queue.add(task);
        if (queue.getMin() == task)
            queue.notify();
    }
}

它接受一个TimerTask对象,一个执行时间和一个周期作为参数。首先,它检查执行时间是否小于0:如果是,则抛出 IllegalArgumentException 。然后,它对周期值进行约束,以防止数值溢出,同时仍然有效地无限大。

接下来,在同步队列的情况下,它检查线程是否允许安排新任务。如果不允许,则抛出 IllegalStateException 。

然后,在同步任务锁的情况下,它检查任务状态是否为VIRGIN(未安排)。

如果不是,则抛出 IllegalStateException 。然后,它设置任务的下一次执行时间、周期和状态。

最后,在同步队列的情况下,将任务添加到队列中,并检查队列中的最小元素是否为该任务。如果是,则通知队列。

取消操作

cancel
public void cancel() {
    synchronized(queue) {
        thread.newTasksMayBeScheduled = false;
        queue.clear();
        queue.notify();  // In case queue was already empty.
    }
}

终止此计时器,丢弃任何当前计划的任务。不会干扰当前正在执行的任务(如果存在)。一旦计时器被终止,它的执行线程就会优雅地终止,并且不再可以在其上调度任何任务。

请注意,从由该计时器调用的计时器任务的 run 方法中调用此方法绝对保证正在执行的任务是该计时器将执行的最后一个任务执行。

这个方法可以被重复调用;第二次和随后的调用没有效果。

从该计时器的任务队列中移除所有已取消的任务。调用此方法对计时器的行为没有影响,但会从队列中消除对已取消任务的引用。如果没有对这些任务的外部引用,它们就有资格进行垃圾收集。

purge

大多数程序都不需要调用这个方法。它是为取消大量任务的罕见应用程序而设计的。调用此方法以时间换取空间:该方法的运行时间可能与 O(n) + O(c * log n) 成正比,其中 n 是队列中的任务数量,c 是取消的任务数量。允许在此计时器上调度的任务中调用此方法。返回值是从队列中移除的任务数。

public int purge() {
    int result = 0;

    synchronized(queue) {
        for (int i = queue.size(); i > 0; i--) {
            if (queue.get(i).state == TimerTask.CANCELLED) {
                queue.quickRemove(i);
                result++;
            }
        }

        if (result != 0)
            queue.heapify();
    }

    return result;
}

从队列中移除所有已取消的任务,并返回已移除任务的数量。首先,它在同步队列的情况下,遍历队列中的所有元素。对于每个元素,如果其状态为CANCELLED,则使用quickRemove方法将其从队列中移除,并将结果计数器递增。

遍历完成后,如果结果计数器不为0,则调用heapify方法重新构建堆。最后,返回结果计数器的值。

转载自:https://juejin.cn/post/7211437215335645244
评论
请登录