前端必备的定时任务技能 - Cron + node-schedule
本文正在参加「金石计划 . 瓜分6万现金大奖」
Cron表达式是用来表达时间相关信息的字符串,用来做定时任务这类需求是最好的选择,前端在浏览器端不太会用得到,但如果是node.js相关的业务,这就是一个必备的技能,学好cron表达式,再结合一些工具库(node-schedule),你就可以轻松写好所有场景的定时任务。
Cron表达式
Cron其实是类Unix系统中的任务调度程序,以固定时间、日期或间隔定期执行脚本任务,最适合来安排重复性的任务,比如定时发送邮件,监控数据指标等。
冷知识:Cron的名字来源于chronos,希腊语中的时间
定时任务肯定得结合场景,下面就是一个简单的例子
场景:周五晚上10点提醒我发周报
Cron表达式:0 0 22 ? * 6
上面的表达式看不懂没关系,下面就来详细解释每个字符是什么意思
字符含义
年用的不多,就没有展示
如上图所示,Cron表达式的字符串是由简单的几个字符和空格拼接而成,一般会有4-6个空格,空格隔开的那部分就会形成时间子配置。
子配置 | 是否必填 | 范围 | 允许的特殊字符 | 说明 |
---|---|---|---|---|
秒 | 否 | 0-59 | * , - / | 不常用 |
分 | 是 | 0-59 | * , - / | |
小时 | 是 | 0-23 | * , - / | |
几号 | 是 | 1-31 | * , - / ? L W | |
月份 | 是 | 1-12或JAN - DEC | * , - / | |
星期几 | 是 | 0-7或SUN-SAT | * , - / ? L # | 0和7是周六,1是周日 |
年 | 否 | 1970-2099 | * , - / | 不常用 |
特殊字符说明:
*
:匹配任意单个原子值,每分钟,每小时,每天等
,
:分隔列表项,星期项 2,4,6
表示每周的一三五匹配
-
:定义范围,小时项 9-18
表示早上9点到下午6点,每小时都匹配
/
:表示间隔时间执行,小时项9/2
表示从早上9点开始,每2小时执行一次
?
:只用于日期项和星期项,为了表示互斥,日期项如果有值,星期项就得用?,反之亦然,否则配置会失效
L
:只用于日期项和星期项,表示一个月的倒数第几天或一星期中的倒数第几天,5L
表示倒数第五天
W
:只用于日期项,表示距离工作日(周一到周五)最近的一天
#
:只用于星期项,5#3
对应于每个月的第三个星期五
常见场景
0 0 8 * * ?
一三五该锻炼身体了
0 30 19 ? * 2,4,6
工作2小时,摸鱼20分钟
0 0 9/2 ? * MON-FRI
月末写总结
0 15 10 L * ?
双11尾款人冲鸭
59 59 23 10 11 ?
node-schedule
node.js并没有原生的方法支持调用cron,需要借助第三方库来实现,主要还是依赖于setTimeout
来实现。
node-schedule
是一个灵活时间调度工具库,支持cron表达式,普通Date对象,内置语义化类等时间格式
安装:
npm install node-schedule
// or
yarn add node-schedule
时间规则
node-schedule
提供scheduleJon
方法来注册调度任务,第一个参数是时间规则,第二个参数是时间匹配后的回调
cron
表达式中的L和W暂不支持
固定时间(只会执行一次)
语义化时间实例
这是node-schedule
内置的语义化时间规则RecurrenceRule
,配置起来更加清晰,可以防止因为眼花而导致cron表达式配错
属性如下所示:
second (0-59)
minute (0-59)
hour (0-23)
date (1-31)
month (0-11)
year
dayOfWeek (0-6) Starting with Sunday
tz
tz - time zone时区,默认系统时区
定义范围
上面的代码就定义了规则生效的范围,在未来5秒内,每秒都执行
Job事件
通过上面的简单使用介绍,可以看到每次调用scheduleJob函数,都会返回一个job实例,这里来讲讲job相关的事件处理
job.cancel(reschedule = false)
取消当前job所有定时任务,入参是一个布尔值,true表示清空后重新开启当前定时任务,默认为false
job.cancelNext(reschedule = true)
和cancel事件类似,但是这是取消下一次即将执行的函数,reschedule默认为true
job.reschedule(spec)
对job重新定义时间规则
job.nextInvocation()
手动调用最近一次时间匹配的触发器
Job
还基于EventEmitter
来实现了几个通用的事件监听,简单用法如下:
run:表示当前任务正在执行
success:会返回invoke结果,支持异步Promise语法
error:异步Promise报错才会有返回结果
scheduled:任务注册或重新注册实行
cancel:任务取消时执行
不了解EventEmitter
,可以看下简易版本的nanoevent
,逻辑相当好理解
export let createNanoEvents = () => ({
events: {},
emit(event, ...args) {
let callbacks = this.events[event] || []
for (let i = 0, length = callbacks.length; i < length; i++) {
callbacks[i](...args)
}
},
on(event, cb) {
this.events[event]?.push(cb) || (this.events[event] = [cb])
return () => {
this.events[event] = this.events[event]?.filter(i => cb !== i)
}
}
})
自定义封装
Job的任务注册和事件监听其实是分开用的,仔细想想能不能把两者结合起来,整合成一个Class类,于是我封装了下面这个Schedule类:
class Schedule {
constructor() {
if (!this.spec) {
throw new Error('Time config (spec) Required')
}
const job = schedule.scheduleJob(this.spec, this.invoke)
this._registerEvent(job)
this.job = job
}
async invoke() {}
_registerEvent(job) {
job.on('success', (val) => {
this._log('success', val)
this.successCb(val)
})
job.on('error', (err) => {
this._log('error', err)
this.errorCb(err)
})
}
// 成功的回调
successCb() {}
// 失败的回调
errorCb() {}
// 可自定义封装其他
_log(type, info) {
if (type === 'error') {
// console.error(info)
return
}
// console.log(type, info)
}
}
具体用法:
class SendEmail extends Schedule {
get spec() {
return '0/5 * * * * *'
}
async invoke() {
const res = await mockSms(1)
return res
}
errorCb() {
// retry
mockSms(1)
}
}
new SendEmail()
配置时间规则,回调,错误处理就能在一个类里面解决。
同时,你还可以在Schedule里统一打你的全局日志,不漏掉任何调用信息:
源码
其实node-schedule
整体的逻辑代码并不多,只有500行左右,所以也非常值得花点小时间研究一下,下面是我画的一张依赖引用相关图
虚线其实就是核心方法scheduleJob
的调用链路,入口代码也很好理解,其实就是判断下相关的参数,然后实例化Job类,从而调用schedule进行注册定时任务:
function scheduleJob() {
if (arguments.length < 2) {
throw new RangeError('Invalid number of arguments');
}
const name = (arguments.length >= 3 && typeof arguments[0] === 'string') ? arguments[0] : null;
const spec = name ? arguments[1] : arguments[0];
const method = name ? arguments[2] : arguments[1];
const callback = name ? arguments[3] : arguments[2];
if (typeof method !== 'function') {
throw new RangeError('The job method must be a function.');
}
// 校验完入参后,开始构造任务单元
const job = new Job(name, method, callback);
if (job.schedule(spec)) {
return job;
}
return null;
}
任务注册器 - Job
主要处理以下事情:
-
解析时间相关配置(
cron
,Date
,RecurrenceRule
) -
定义Job独有的事件(cancel,cancelNext,reschedule等)
-
分配invoke方法
解析时间配置:
-
start,end都会在这个阶段配置
-
用
cron-parser
解析cron表达式,解析生成就开始加入下一次的队列调用(scheduleNextRecurrence
),返回的invocation
加入job
的pendingInvocations
-
cron解析失败,判断是否是
Date
格式,这种直接实例化Invocation
,并加入等待 -
对象就解析成
RecurrenceRule
配置,剩余逻辑和流程2相同
cron-parser
可以理解为cron表达式的解析工具包,方便提取每次的生效时间
const cronParser = require('cron-parser')
const CronDate = require('cron-parser/lib/date')
Job.prototype.schedule = function(spec) {
const self = this;
let success = false;
let inv;
let start;
let end;
let tz;
// save passed-in value before 'spec' is replaced
// 记录当前的入参解析,在传入之前
if (typeof spec === 'object' && 'tz' in spec) {
tz = spec.tz;
}
if (typeof spec === 'object' && spec.rule) {
start = spec.start || undefined;
end = spec.end || undefined;
spec = spec.rule;
if (start) {
if (!(start instanceof Date)) {
start = new Date(start);
}
start = new CronDate(start, tz);
if (!isValidDate(start) || start.getTime() < Date.now()) {
start = undefined;
}
}
if (end && !(end instanceof Date) && !isValidDate(end = new Date(end))) {
end = undefined;
}
if (end) {
end = new CronDate(end, tz);
}
}
try {
// 1. 解析cron表达式
const res = cronParser.parseExpression(spec, {currentDate: start, tz: tz});
// 返回下一次的调度方法
inv = scheduleNextRecurrence(res, self, start, end);
if (inv !== null) {
success = self.trackInvocation(inv);
}
} catch (err) {
const type = typeof spec;
// 2. 字符串或数字判断为固定时间 Date 格式
if ((type === 'string') || (type === 'number')) {
spec = new Date(spec);
}
// 2. 标准 Date 格式,手动注册Invocation对象
if ((spec instanceof Date) && (isValidDate(spec))) {
spec = new CronDate(spec);
self.isOneTimeJob = true;
if (spec.getTime() >= Date.now()) {
inv = new Invocation(self, spec);
scheduleInvocation(inv);
success = self.trackInvocation(inv);
}
// 3. 如果是对象,转换成RecurrenceRule类
} else if (type === 'object') {
self.isOneTimeJob = false;
if (!(spec instanceof RecurrenceRule)) {
const r = new RecurrenceRule();
if ('year' in spec) {
r.year = spec.year;
}
if ('month' in spec) {
r.month = spec.month;
}
if ('date' in spec) {
r.date = spec.date;
}
if ('dayOfWeek' in spec) {
r.dayOfWeek = spec.dayOfWeek;
}
if ('hour' in spec) {
r.hour = spec.hour;
}
if ('minute' in spec) {
r.minute = spec.minute;
}
if ('second' in spec) {
r.second = spec.second;
}
spec = r;
}
spec.tz = tz;
inv = scheduleNextRecurrence(spec, self, start, end);
if (inv !== null) {
success = self.trackInvocation(inv);
}
}
}
scheduledJobs[this.name] = this;
return success;
};
Job独有的事件:
这里讲个比较有代表性的cancel事件
因为一个Job可能会多次调用schedule方法,那么就会有好几个不同的时间配置,这时候其实就是用队列(上文有提到的pendingInvocations
)来去管理,如果要取消所有定时任务,那么就要遍历取消所有等待中的Invocation
实例,进行取消。
cancelInvocation
方法会在下文提到
// 取消任务
this.cancel = function(reschedule) {
reschedule = (typeof reschedule == 'boolean') ? reschedule : false;
let inv, newInv;
const newInvs = [];
for (let j = 0; j < this.pendingInvocations.length; j++) {
inv = this.pendingInvocations[j];
cancelInvocation(inv);
if (reschedule && (inv.recurrenceRule.recurs || inv.recurrenceRule.next)) {
newInv = scheduleNextRecurrence(inv.recurrenceRule, this, inv.fireDate, inv.endDate);
if (newInv !== null) {
newInvs.push(newInv);
}
}
}
this.pendingInvocations = [];
for (let k = 0; k < newInvs.length; k++) {
this.trackInvocation(newInvs[k]);
}
// remove from scheduledJobs if reschedule === false
if (!reschedule) {
this.deleteFromSchedule()
}
return true;
};
分配invoke方法:
Job.prototype.invoke = function(fireDate) {
// trigger计数用于单测,无实际业务逻辑
this.setTriggeredJobs(this.triggeredJobs() + 1);
return this.job(fireDate);
};
定时任务执行 - Invocation
核心逻辑主要处理以下事情:
- 定时器适配定时任务
- job事件emit触发
定时器闭环:
这里其实就是定时任务的核心部分了,简单来讲就是利用setTimeout来实现,通过下面的流程图其实你就能大概知道如何实现定时任务
带着上面这个图看源码:
上文中解析完cron配置后,就会执行scheduleNextRecurrence
:
- 判断当前时间处于合理的范围内,[start, end]中
- 实例化
Invocation
,并安排定时任务 =>scheduleInvocation
function scheduleNextRecurrence(rule, job, prevDate, endDate) {
// 没有指定的开始时间,当前时间就是开始时间
prevDate = (prevDate instanceof CronDate) ? prevDate : new CronDate();
// 返回规则的下一次执行时间
const date = (rule instanceof RecurrenceRule) ? rule._nextInvocationDate(prevDate) : rule.next();
if (date === null) {
return null;
}
// 如果下次执行时间超出结束时间,就直接结束
if ((endDate instanceof CronDate) && date.getTime() > endDate.getTime()) {
return null;
}
// 实例化Invocation, 并开始安排定时任务
const inv = new Invocation(job, date, rule, endDate);
scheduleInvocation(inv);
return inv;
}
scheduleInvocation 安排调度器:
- 根据cron表达式解析的最近时间排序当前的invocation
- job 触发
scheduled
事件 - 下一层调用链 =>
prepareNextInvocation
function sorter(a, b) {
return (a.fireDate.getTime() - b.fireDate.getTime());
}
// 安排当前的调度器,按时间排序,event触发scheduled事件
function scheduleInvocation(invocation) {
sorted.add(invocations, invocation, sorter);
prepareNextInvocation();
const date = invocation.fireDate instanceof CronDate ? invocation.fireDate.toDate() : invocation.fireDate;
invocation.job.emit('scheduled', date);
}
prepareNextInvocation 设置定时器:
- 清除当前已有定时器 - 同一时刻下只能有一个setTimeout存在
- 定时器触发后就执行invoke
- 触发job的run,success,error事件
- 清除队列,并准备下一次的定时器 (prepareNextInvocation),有点递归的意思
function prepareNextInvocation() {
if (invocations.length > 0 && currentInvocation !== invocations[0]) {
// 清除当前定时器
if (currentInvocation !== null) {
lt.clearTimeout(currentInvocation.timerID);
currentInvocation.timerID = null;
currentInvocation = null;
}
currentInvocation = invocations[0];
const job = currentInvocation.job;
const cinv = currentInvocation;
currentInvocation.timerID = runOnDate(currentInvocation.fireDate, function() {
currentInvocationFinished();
if (job.callback) {
job.callback();
}
console.log(cinv.recurrenceRule.recurs, cinv.recurrenceRule._endDate)
// 把新cron解析的方法继续推进去
if (cinv.recurrenceRule.recurs || cinv.recurrenceRule._endDate === null) {
const inv = scheduleNextRecurrence(cinv.recurrenceRule, cinv.job, cinv.fireDate, cinv.endDate);
if (inv !== null) {
inv.job.trackInvocation(inv);
}
}
job.stopTrackingInvocation(cinv);
try {
const result = job.invoke(cinv.fireDate instanceof CronDate ? cinv.fireDate.toDate() : cinv.fireDate);
job.emit('run');
job.running += 1;
// invoke函数可能是异步的,这里会判断是否成功执行
if (result instanceof Promise) {
result.then(function (value) {
job.emit('success', value);
job.running -= 1;
}).catch(function (err) {
job.emit('error', err);
job.running -= 1;
});
} else {
job.emit('success', result);
job.running -= 1;
}
} catch (err) {
job.emit('error', err);
job.running -= 1;
}
// 如果是单次时间调用,就直接删除Job
if (job.isOneTimeJob) {
job.deleteFromSchedule();
}
});
}
}
// 核心基础代码,设置定时器
function runOnDate(date, job) {
const now = Date.now();
const then = date.getTime();
return lt.setTimeout(function() {
if (then > Date.now())
runOnDate(date, job);
else
job();
}, (then < now ? 0 : then - now));
}
// 清除当前的队列,准备下一次的调用
function currentInvocationFinished() {
invocations.shift();
currentInvocation = null;
prepareNextInvocation();
}
这里还有个细节
lt.setTimeout
这个其实是引了一个long-timeout
的库,主要是为了解决js中最大安全数字的问题:
至此,定时任务就形成了闭环。
结束
虽说定时任务一般是后端来做,但是作为前端,学习一下其中的精髓还是不错的,一些简单的场景也能用node-schedule来覆盖。
创造不易,希望jym多多 点赞 + 关注 + 收藏 三连,持续更新中!!!
PS: 文中有任何错误,欢迎掘友指正
往期精彩📌
参考:
转载自:https://juejin.cn/post/7163608389233147918