"排队执行:如何优雅地管理JavaScript异步任务"
背景
在公司做IM
系统,其中有个场景是重新登录时会收到一堆离线的消息(包括普通消息,群聊消息,命令消息),消息是一条一条处理的,而且处理单条消息是一个异步的过程,直接执行的话可能处理某条消息是要依赖其他消息的处理操作,例如先说到某条消息再收到该消息撤回命令,可能收到消息过后本地缓存的动作还没执行完,就执行了撤回的操作,导致该消息等于是先撤回再本地缓存,就有问题啦。 此时就需要一个队列,保证先进先出的原则,一条一条执行消息。
解析
队列
是一种数据结构,它遵循先进先出(FIFO)
原则 - 添加到队列中的第一个元素是最先被处理或从队列中删除的元素。在软件开发中,队列通常用于管理异步任务,例如网络请求或数据库操作。通过使用队列,我们可以确保这些任务按照它们接收到的顺序依次执行。
但是,在某些情况下,我们可能希望限制同一时间内并发执行任务的数量。这就是有限并发队列非常有用的地方。它允许我们控制任务执行的速率,并通过限制可以并发执行的任务数来防止资源耗尽。
下面我们逐个解析代码,以了解它是如何工作的。
class Queue {
private _count: number = 0
private _taskList: Array<Function> = [];
private _max: number = 1;
constructor(max: number = 1) {
this._max = max;
this._count = 0;
this._taskList = [];
}
}
这段代码定义了一个名为 Queue
的类,具有三个私有属性:_count
、_taskList
和 _max
。_count
属性用于跟踪当前正在执行的任务数。_taskList
属性存储所有等待执行的排队任务。_max
属性指定可以同时执行的最大并发任务数。
构造函数使用默认值初始化这些属性。它还接受一个可选的 max
参数,允许在创建新队列实例时设置最大并发任务数。
trigger(fn: Function, params:any) {
return new Promise((res, req) => {
const task = this._execute(fn, params, res, req);
if (this._count < this._max) {
task();
} else {
this._taskList.push(task);
}
})
}
trigger
方法用于向队列中添加新任务。它接受两个参数,一个表示要执行的任务的函数 fn
和任何应传递给该函数的参数。
方法返回一个 Promise,在任务完成时解析或在出现错误时拒绝。
在 trigger
方法内部,我们首先使用 _execute
方法创建一个新的 task
,该方法返回一个函数,该函数执行实际任务。
然后,我们检查当前正在运行的任务数(_count
)是否小于允许的最大并发任务数(_max
)。如果是,则立即通过调用 task()
执行任务。否则,我们将任务推入 _taskList
数组中,表示该任务正在等待稍后执行。
_execute(fn:Function, params:any, res: any, req:any) {
return () => {
this._count++;
let executeFlag = true;
let timer = setTimeout(() => {
console.error('queue-warning: 任务执行超时');
clearTimeout(timer);
this._count>0 && this._count--;
executeFlag = false;
this._next();
},1000 * 10)
fn(params).then(res).catch(req).finally(() => {
this._count>0 && this._count--;
clearTimeout(timer);
if (executeFlag) {
this._next();
}
})
}
}
_execute
方法负责创建实际添加到队列中的任务函数。它接受四个参数 - 原始的fn
任务函数,任何传递给该函数的params
参数以及两个回调函数res
和req
。
这个方法返回一个新的函数,该函数执行原始任务函数以提供的参数。首先,它将_count
属性增加1以表示已经开始执行新任务。然后,它记录消息以指示当前正在执行哪个任务。
接下来,它创建一个计时器,用于检测任务是否超时。如果任务执行时间超过10秒,则会触发超时错误并结束任务。然后,它减少_count
属性,表示已完成一个任务,并将任务标记为无法执行,以便在下一次 _next()
被调用时处理队列中的下一个任务。
最后,它返回一个 Promise,当任务完成时解析或在出现错误时拒绝。在这个 Promise 的 finally 块中,我们再次减少_count
属性,表示任务已完成。同时,我们清除计时器并检查 executeFlag 标志。如果它已被设置为 false,则说明任务已超时,不需要再次调用 _next()
来处理队列中的下一个任务。否则,我们调用 _next()
方法来处理队列中的下一个任务。
_next() {
if (this._taskList.length > 0) {
const task = this._taskList.shift();
task && task();
}
}
最后,我们有 _next
方法,该方法处理队列中的下一个任务。如果队列_taskList
中有等待执行的任务,则从数组中获取第一个任务并执行它。
通过使用这个队列,您可以控制任务执行的速率,并防止资源耗尽或应用程序崩溃。希望对你有帮助!
注意:队列中的异步任务fn必须保证有 resolve 或 reject 返回值,否则队列会处于 pendding 中,会等待10秒超时继续执行下一个任务
代码
class Queue {
private _count: number = 0
private _taskList: Array<Function> = [];
private _max: number = 1;
constructor(max: number = 1) {
this._max = max;
this._count = 0;
this._taskList = [];
}
trigger(fn: Function, params:any) {
return new Promise((res, req) => {
const task = this._execute(fn, params, res, req);
if (this._count < this._max) {
task();
} else {
console.log('=-------------->存在队列', this._taskList.length)
}
})
}
_execute(fn:Function, params:any, res: any, req:any) {
return () => {
this._count++;
let executeFlag = true;
let timer = setTimeout(() => {
console.error('queue-warning: 任务执行超时');
clearTimeout(timer);
this._count>0 && this._count--;
executeFlag = false;
this._next();
},1000 * 10)
fn(params).then(res).catch(req).finally(() => {
this._count>0 && this._count--;
clearTimeout(timer);
if (executeFlag) {
this._next();
}
})
}
}
_next() {
if (this._taskList.length > 0) {
const task = this._taskList.shift();
task && task();
}
}
}
const myQueue = new Queue()
myQueue.trigger(fn,params)
转载自:https://juejin.cn/post/7237425413873860665