likes
comments
collection
share

前端必备的定时任务技能 - Cron + node-schedule

作者站长头像
站长
· 阅读数 20

本文正在参加「金石计划 . 瓜分6万现金大奖」

Cron表达式是用来表达时间相关信息的字符串,用来做定时任务这类需求是最好的选择,前端在浏览器端不太会用得到,但如果是node.js相关的业务,这就是一个必备的技能,学好cron表达式,再结合一些工具库(node-schedule),你就可以轻松写好所有场景的定时任务。

Cron表达式

Cron其实是类Unix系统中的任务调度程序,以固定时间、日期或间隔定期执行脚本任务,最适合来安排重复性的任务,比如定时发送邮件,监控数据指标等。

冷知识:Cron的名字来源于chronos,希腊语中的时间

定时任务肯定得结合场景,下面就是一个简单的例子

场景:周五晚上10点提醒我发周报

Cron表达式:0 0 22 ? * 6

上面的表达式看不懂没关系,下面就来详细解释每个字符是什么意思

字符含义

前端必备的定时任务技能 - Cron + node-schedule

年用的不多,就没有展示

如上图所示,Cron表达式的字符串是由简单的几个字符和空格拼接而成,一般会有4-6个空格,空格隔开的那部分就会形成时间子配置。

子配置是否必填范围允许的特殊字符说明
0-59* , - /不常用
0-59* , - /
小时0-23* , - /
几号1-31* , - / ? L W
月份1-12或JAN - DEC* , - /
星期几0-7或SUN-SAT* , - / ? L #0和7是周六,1是周日
1970-2099* , - /不常用

特殊字符说明

*:匹配任意单个原子值,每分钟,每小时,每天等

,:分隔列表项,星期项 2,4,6表示每周的一三五匹配

-:定义范围,小时项 9-18表示早上9点到下午6点,每小时都匹配

/:表示间隔时间执行,小时项9/2表示从早上9点开始,每2小时执行一次

?:只用于日期项和星期项,为了表示互斥,日期项如果有值,星期项就得用?,反之亦然,否则配置会失效

L:只用于日期项和星期项,表示一个月的倒数第几天或一星期中的倒数第几天,5L表示倒数第五天

W:只用于日期项,表示距离工作日(周一到周五)最近的一天

#:只用于星期项,5#3对应于每个月的第三个星期五

常见场景

0 0 8 * * ?

一三五该锻炼身体了

0 30 19 ? * 2,4,6

工作2小时,摸鱼20分钟

0 0 9/2 ? * MON-FRI

月末写总结

0 15 10 L * ?

双11尾款人冲鸭

59 59 23 10 11 ?

node-schedule

node.js并没有原生的方法支持调用cron,需要借助第三方库来实现,主要还是依赖于setTimeout来实现。

node-schedule是一个灵活时间调度工具库,支持cron表达式普通Date对象内置语义化类等时间格式

安装:

npm install node-schedule
// or
yarn add node-schedule

时间规则

node-schedule提供scheduleJon方法来注册调度任务,第一个参数是时间规则,第二个参数是时间匹配后的回调

cron

前端必备的定时任务技能 - Cron + node-schedule

表达式中的L和W暂不支持

固定时间(只会执行一次)

前端必备的定时任务技能 - Cron + node-schedule

语义化时间实例

前端必备的定时任务技能 - Cron + node-schedule

这是node-schedule内置的语义化时间规则RecurrenceRule,配置起来更加清晰,可以防止因为眼花而导致cron表达式配错

属性如下所示:

  • second (0-59)
  • minute (0-59)
  • hour (0-23)
  • date (1-31)
  • month (0-11)
  • year
  • dayOfWeek (0-6) Starting with Sunday
  • tz

tz - time zone时区,默认系统时区

定义范围

前端必备的定时任务技能 - Cron + node-schedule

上面的代码就定义了规则生效的范围,在未来5秒内,每秒都执行

Job事件

通过上面的简单使用介绍,可以看到每次调用scheduleJob函数,都会返回一个job实例,这里来讲讲job相关的事件处理

job.cancel(reschedule = false)

取消当前job所有定时任务,入参是一个布尔值,true表示清空后重新开启当前定时任务,默认为false

job.cancelNext(reschedule = true)

和cancel事件类似,但是这是取消下一次即将执行的函数,reschedule默认为true

job.reschedule(spec)

对job重新定义时间规则

job.nextInvocation()

手动调用最近一次时间匹配的触发器

Job还基于EventEmitter来实现了几个通用的事件监听,简单用法如下:

前端必备的定时任务技能 - Cron + node-schedule

run:表示当前任务正在执行

success:会返回invoke结果,支持异步Promise语法

error:异步Promise报错才会有返回结果

scheduled:任务注册或重新注册实行

cancel:任务取消时执行

不了解EventEmitter,可以看下简易版本的nanoevent,逻辑相当好理解

export let createNanoEvents = () => ({
  events: {},
  emit(event, ...args) {
    let callbacks = this.events[event] || []
    for (let i = 0, length = callbacks.length; i < length; i++) {
      callbacks[i](...args)
    }
  },
  on(event, cb) {
    this.events[event]?.push(cb) || (this.events[event] = [cb])
    return () => {
      this.events[event] = this.events[event]?.filter(i => cb !== i)
    }
  }
})

自定义封装

Job的任务注册和事件监听其实是分开用的,仔细想想能不能把两者结合起来,整合成一个Class类,于是我封装了下面这个Schedule类:

class Schedule {
  constructor() {
    if (!this.spec) {
      throw new Error('Time config (spec) Required')
    }
    const job = schedule.scheduleJob(this.spec, this.invoke)
    this._registerEvent(job)
    this.job = job
  }

  async invoke() {}

  _registerEvent(job) {
    job.on('success', (val) => {
      this._log('success', val)
      this.successCb(val)
    })
    job.on('error', (err) => {
      this._log('error', err)
      this.errorCb(err)
    })
  }

  // 成功的回调
  successCb() {}

  // 失败的回调
  errorCb() {}

  // 可自定义封装其他
  _log(type, info) {
    if (type === 'error') {
      // console.error(info)
      return
    }
    // console.log(type, info)
  }
}

具体用法:

class SendEmail extends Schedule {
  get spec() {
    return '0/5 * * * * *'
  }

  async invoke() {
    const res = await mockSms(1)
  	return res
  }

  errorCb() {
    // retry
		mockSms(1)
  }
}

new SendEmail()

配置时间规则,回调,错误处理就能在一个类里面解决。

同时,你还可以在Schedule里统一打你的全局日志,不漏掉任何调用信息:

前端必备的定时任务技能 - Cron + node-schedule

源码

其实node-schedule整体的逻辑代码并不多,只有500行左右,所以也非常值得花点小时间研究一下,下面是我画的一张依赖引用相关图

前端必备的定时任务技能 - Cron + node-schedule

虚线其实就是核心方法scheduleJob的调用链路,入口代码也很好理解,其实就是判断下相关的参数,然后实例化Job类,从而调用schedule进行注册定时任务:

function scheduleJob() {
  if (arguments.length < 2) {
    throw new RangeError('Invalid number of arguments');
  }

  const name = (arguments.length >= 3 && typeof arguments[0] === 'string') ? arguments[0] : null;
  const spec = name ? arguments[1] : arguments[0];
  const method = name ? arguments[2] : arguments[1];
  const callback = name ? arguments[3] : arguments[2];

  if (typeof method !== 'function') {
    throw new RangeError('The job method must be a function.');
  }

  // 校验完入参后,开始构造任务单元
  const job = new Job(name, method, callback);
  if (job.schedule(spec)) {
    return job;
  }

  return null;
}

任务注册器 - Job

主要处理以下事情:

  • 解析时间相关配置(cronDateRecurrenceRule

  • 定义Job独有的事件(cancel,cancelNext,reschedule等)

  • 分配invoke方法

解析时间配置

  1. start,end都会在这个阶段配置

  2. cron-parser解析cron表达式,解析生成就开始加入下一次的队列调用(scheduleNextRecurrence),返回的invocation加入jobpendingInvocations

  3. cron解析失败,判断是否是Date格式,这种直接实例化Invocation,并加入等待

  4. 对象就解析成RecurrenceRule配置,剩余逻辑和流程2相同

cron-parser可以理解为cron表达式的解析工具包,方便提取每次的生效时间

const cronParser = require('cron-parser')
const CronDate = require('cron-parser/lib/date')

Job.prototype.schedule = function(spec) {
  const self = this;
  let success = false;
  let inv;
  let start;
  let end;
  let tz;

  // save passed-in value before 'spec' is replaced
  // 记录当前的入参解析,在传入之前
  if (typeof spec === 'object' && 'tz' in spec) {
    tz = spec.tz;
  }

  if (typeof spec === 'object' && spec.rule) {
    start = spec.start || undefined;
    end = spec.end || undefined;
    spec = spec.rule;

    if (start) {
      if (!(start instanceof Date)) {
        start = new Date(start);
      }

      start = new CronDate(start, tz);
      if (!isValidDate(start) || start.getTime() < Date.now()) {
        start = undefined;
      }
    }

    if (end && !(end instanceof Date) && !isValidDate(end = new Date(end))) {
      end = undefined;
    }

    if (end) {
      end = new CronDate(end, tz);
    }
  }

  try {
    // 1. 解析cron表达式
    const res = cronParser.parseExpression(spec, {currentDate: start, tz: tz});
    // 返回下一次的调度方法
    inv = scheduleNextRecurrence(res, self, start, end);
    if (inv !== null) {
      success = self.trackInvocation(inv);
    }
  } catch (err) {
    const type = typeof spec;
    // 2. 字符串或数字判断为固定时间 Date 格式
    if ((type === 'string') || (type === 'number')) {
      spec = new Date(spec);
    }

    // 2. 标准 Date 格式,手动注册Invocation对象
    if ((spec instanceof Date) && (isValidDate(spec))) {
      spec = new CronDate(spec);
      self.isOneTimeJob = true;
      if (spec.getTime() >= Date.now()) {
        inv = new Invocation(self, spec);
        scheduleInvocation(inv);
        success = self.trackInvocation(inv);
      }
    // 3. 如果是对象,转换成RecurrenceRule类
    } else if (type === 'object') {
      self.isOneTimeJob = false;
      if (!(spec instanceof RecurrenceRule)) {
        const r = new RecurrenceRule();
        if ('year' in spec) {
          r.year = spec.year;
        }
        if ('month' in spec) {
          r.month = spec.month;
        }
        if ('date' in spec) {
          r.date = spec.date;
        }
        if ('dayOfWeek' in spec) {
          r.dayOfWeek = spec.dayOfWeek;
        }
        if ('hour' in spec) {
          r.hour = spec.hour;
        }
        if ('minute' in spec) {
          r.minute = spec.minute;
        }
        if ('second' in spec) {
          r.second = spec.second;
        }

        spec = r;
      }

      spec.tz = tz;
      inv = scheduleNextRecurrence(spec, self, start, end);
      if (inv !== null) {
        success = self.trackInvocation(inv);
      }
    }
  }

  scheduledJobs[this.name] = this;
  return success;
};

Job独有的事件:

这里讲个比较有代表性的cancel事件

因为一个Job可能会多次调用schedule方法,那么就会有好几个不同的时间配置,这时候其实就是用队列(上文有提到的pendingInvocations)来去管理,如果要取消所有定时任务,那么就要遍历取消所有等待中的Invocation实例,进行取消。

cancelInvocation方法会在下文提到

// 取消任务
this.cancel = function(reschedule) {
  reschedule = (typeof reschedule == 'boolean') ? reschedule : false;

  let inv, newInv;
  const newInvs = [];
  for (let j = 0; j < this.pendingInvocations.length; j++) {
    inv = this.pendingInvocations[j];

    cancelInvocation(inv);

    if (reschedule && (inv.recurrenceRule.recurs || inv.recurrenceRule.next)) {
      newInv = scheduleNextRecurrence(inv.recurrenceRule, this, inv.fireDate, inv.endDate);
      if (newInv !== null) {
        newInvs.push(newInv);
      }
    }
  }

  this.pendingInvocations = [];

  for (let k = 0; k < newInvs.length; k++) {
    this.trackInvocation(newInvs[k]);
  }

  // remove from scheduledJobs if reschedule === false
  if (!reschedule) {
    this.deleteFromSchedule()
  }

  return true;
};

分配invoke方法

Job.prototype.invoke = function(fireDate) {
  // trigger计数用于单测,无实际业务逻辑
  this.setTriggeredJobs(this.triggeredJobs() + 1);
  return this.job(fireDate);
};

定时任务执行 - Invocation

核心逻辑主要处理以下事情:

  • 定时器适配定时任务
  • job事件emit触发

定时器闭环:

这里其实就是定时任务的核心部分了,简单来讲就是利用setTimeout来实现,通过下面的流程图其实你就能大概知道如何实现定时任务

前端必备的定时任务技能 - Cron + node-schedule

带着上面这个图看源码:

上文中解析完cron配置后,就会执行scheduleNextRecurrence

  1. 判断当前时间处于合理的范围内,[start, end]中
  2. 实例化Invocation,并安排定时任务 => scheduleInvocation
function scheduleNextRecurrence(rule, job, prevDate, endDate) {

  // 没有指定的开始时间,当前时间就是开始时间
  prevDate = (prevDate instanceof CronDate) ? prevDate : new CronDate();

  // 返回规则的下一次执行时间
  const date = (rule instanceof RecurrenceRule) ? rule._nextInvocationDate(prevDate) : rule.next();
  if (date === null) {
    return null;
  }

  // 如果下次执行时间超出结束时间,就直接结束
  if ((endDate instanceof CronDate) && date.getTime() > endDate.getTime()) {
    return null;
  }

  // 实例化Invocation, 并开始安排定时任务
  const inv = new Invocation(job, date, rule, endDate);
  scheduleInvocation(inv);

  return inv;
}

scheduleInvocation 安排调度器:

  1. 根据cron表达式解析的最近时间排序当前的invocation
  2. job 触发scheduled事件
  3. 下一层调用链 => prepareNextInvocation
function sorter(a, b) {
  return (a.fireDate.getTime() - b.fireDate.getTime());
}

// 安排当前的调度器,按时间排序,event触发scheduled事件
function scheduleInvocation(invocation) {
  sorted.add(invocations, invocation, sorter);
  prepareNextInvocation();
  const date = invocation.fireDate instanceof CronDate ? invocation.fireDate.toDate() : invocation.fireDate;
  invocation.job.emit('scheduled', date);
}

prepareNextInvocation 设置定时器:

  1. 清除当前已有定时器 - 同一时刻下只能有一个setTimeout存在
  2. 定时器触发后就执行invoke
  3. 触发job的run,success,error事件
  4. 清除队列,并准备下一次的定时器 (prepareNextInvocation),有点递归的意思
function prepareNextInvocation() {
  if (invocations.length > 0 && currentInvocation !== invocations[0]) {
    // 清除当前定时器
    if (currentInvocation !== null) {
      lt.clearTimeout(currentInvocation.timerID);
      currentInvocation.timerID = null;
      currentInvocation = null;
    }

    currentInvocation = invocations[0];

    const job = currentInvocation.job;
    const cinv = currentInvocation;
    currentInvocation.timerID = runOnDate(currentInvocation.fireDate, function() {
      currentInvocationFinished();

      if (job.callback) {
        job.callback();
      }

      console.log(cinv.recurrenceRule.recurs, cinv.recurrenceRule._endDate)
      // 把新cron解析的方法继续推进去
      if (cinv.recurrenceRule.recurs || cinv.recurrenceRule._endDate === null) {
        const inv = scheduleNextRecurrence(cinv.recurrenceRule, cinv.job, cinv.fireDate, cinv.endDate);
        if (inv !== null) {
          inv.job.trackInvocation(inv);
        }
      }

      job.stopTrackingInvocation(cinv);

      try {
        const result = job.invoke(cinv.fireDate instanceof CronDate ? cinv.fireDate.toDate() : cinv.fireDate);
        job.emit('run');
        job.running += 1;

        // invoke函数可能是异步的,这里会判断是否成功执行
        if (result instanceof Promise) {
          result.then(function (value) {
            job.emit('success', value);
            job.running -= 1;
          }).catch(function (err) {
            job.emit('error', err);
            job.running -= 1;
          });
        } else {
          job.emit('success', result);
          job.running -= 1;
        }
      } catch (err) {
        job.emit('error', err);
        job.running -= 1;
      }

      // 如果是单次时间调用,就直接删除Job
      if (job.isOneTimeJob) {
        job.deleteFromSchedule();
      }
    });
  }
}

// 核心基础代码,设置定时器
function runOnDate(date, job) {
  const now = Date.now();
  const then = date.getTime();

  return lt.setTimeout(function() {
    if (then > Date.now())
      runOnDate(date, job);
    else
      job();
  }, (then < now ? 0 : then - now));
}

// 清除当前的队列,准备下一次的调用
function currentInvocationFinished() {
  invocations.shift();
  currentInvocation = null;
  prepareNextInvocation();
}

这里还有个细节

lt.setTimeout这个其实是引了一个long-timeout的库,主要是为了解决js中最大安全数字的问题:

前端必备的定时任务技能 - Cron + node-schedule

前端必备的定时任务技能 - Cron + node-schedule

至此,定时任务就形成了闭环。

结束

虽说定时任务一般是后端来做,但是作为前端,学习一下其中的精髓还是不错的,一些简单的场景也能用node-schedule来覆盖。

创造不易,希望jym多多 点赞 + 关注 + 收藏 三连,持续更新中!!!

PS: 文中有任何错误,欢迎掘友指正

往期精彩📌

参考:

github.com/node-schedu…

www.npmjs.com/package/lon…

github.com/ai/nanoeven…

转载自:https://juejin.cn/post/7163608389233147918
评论
请登录