基于Node.js异步优先队列2.0来袭
前言
阔别Web前端3年多时间里面,今天我重新抽空整理了一下之前的一个开源库:priority-async-queue,一个基于Node.js的异步优先任务队列。不了解1.0版本的同学可以戳这里,我们今天主要围绕着paq
的设计初衷和适用场景展开来说。
相比1.0,2.0改进了什么?
改进的主要有三点:
- 1.0只支持单并发状态执行任务队列,2.0则加入了支持多并发执行状态,具体并发数,用户可以自行设置。
- 2.0为每个任务加入了执行时间的统计,具体包括任务创建时间
creatTime
,任务开始执行时间startTime
,和任务执行结束时间endTime
。用户可以自己根据需求计算所需的任务等待时间:startTime - createTime
,或者任务执行消耗的时间endTime - startTime
,又或者只是用来写入日志文件,记录执行时间相关信息。 - 1.0只支持在Node.js环境下执行,2.0则提供支持浏览器可执行的输出文件。让浏览器端遇到同样困扰的同学,能够迎刃而解。
paq设计思路
paq设计思路其实非常简单,一共就4个类组成:
Task
是描述每个待执行(异步/同步)任务的执行逻辑以及配置参数。PriorityQueue
是控制每个待执行(异步/同步)任务的优先级队列、具有队列的基本属性和作。AsyncQueue
是控制每个待执行(异步/同步)任务能严格一定顺序地执行的队列。Event
模拟事件监听和事件触发的类。
2.0为了兼容浏览器环境能正常运行,去掉了对Node.js原生事件类EventEmitter
的依赖,自己实现了简易的事件绑定和触发的功能。
下面是paq 2.0的程序流程图:
paq的设计初衷
我刚转岗到游戏开发的时候,部门迫切需要一个集群打包系统来处理庞大的打包业务。当时,我临危受命,接下了这个任务。后来,我开发的集群打包系统,其打包采用的任务调度,最核心底层代码架构近似于paq
。当然,实际应用会比paq
复杂很多,因为游戏打包流程是一个极其复杂而繁琐的过程,我只是抽离了最核心通用的调度思路来开源成一个通用库。
试想,如果一台打包机只能在同一时间执行单个打包任务,那么就太浪费硬件资源了。但受限于CPU核数、硬盘空间、内存容量、数据读写速度等因素,我们又不能粗暴地向打包机里加入并发执行的打包任务,所以这个时候,能控制好每台打包机的并发数显得尤为重要。既要保证效率,也要保证安全可靠。
paq适用场景
首先,我们必须明确的是,在绝大部分业务场景里面,你可能不需要paq
。没设置并发数的paq
,其任务默认是严格按照顺序执行,并发数始终维持为1,这在绝大部份情况已经降低执行效率。JavaScript
原生支持的异步任务和事件循环,本来就是要充分发挥在单线程执行环境下,最大限度利用CPU多核的特性,从而提高程序执行效率。
可能不少同学都已经知道,在浏览器端,最大并发请求数,每个浏览器厂商都做了一定的限制,如:Chrome允许的最大并发请求数目为6,FireFox是4,每个浏览器版本之间又会存在一定的差异。总而言之,主流浏览器在网络请求方面已经帮我们做好了负载均衡的工作了。而在Node.js环境下,负载均衡问题则需要我们开发者自己来解决。
如果,在短时间内,一大批客户端产生大量的网络请求时候,服务器的承受能力肯定是有限的。这个时候,需要我们用一个像队列的数据结构容器来先存好这些请求,然后按照先进先出
的原则来慢慢提供给服务端处理,压力会减少很多。说到这里,很多有服务端经验的同学,第一时间就会想到消息队列
。没错,paq
很像消息队列
,但它没有遵守生产者
和消费者
模式。所以paq
不能单独处理分布式和集群业务的调度,它更适合放在MQ
的下游。
paq特点
1.paq更小、更易用。
paq
有效源码大概200行左右,Node.js环境下是非常精小的。但在浏览器端,打包压缩后的paq
也有18KB,主要来自是ES6
语法兼容性代码的冗余。
做前端开发者,无论是Web、移动端原生和游戏开发,最折磨的莫过于要兼容各种用户终端运行的环境和设备。
下面是paq
最基础的用法,开箱即用:
const PAQ = require('priority-async-queue');
const paq = new PAQ();
paq.addTask(() => {
console.log('Helo World!');
});
// Hello World!
接着,我们来看看字节的一道经典面试题。
class Scheduler {
add(promiseCreator) {
// 完善Scheduler,使其并发数为2
}
}
const timeout = (time) => new Promise(resolve => {
setTimeout(resolve, time);
})
const scheduler = new Scheduler();
const addTask = (time, order) => {
scheduler.add(() => timeout(time)).then(() => console.log(order));
}
addTask(1000, 1);
addTask(500, 2);
addTask(300, 3);
addTask(400, 4);
// 要求输出顺序
// 2
// 3
// 1
// 4
大家可以稍加思考一下,怎么扩展 Scheduler
类能完成需求。如果见过或者已经知道怎么做的同学不妨看看用paq
怎么轻松实现这个需求。
const PAQ = require('priority-async-queue');
// 实例化paq时,使其并发数为2
const paq = new PAQ(2);
class Scheduler {
add(promiseCreator) {
return new Promise(resolve => {
paq.addTask({
completed: (ctx, res) => {
resolve(res);
}
}, () => promiseCreator());
});
}
}
...
至于,不借助paq
又怎么实现这个需求呢?有兴趣的同学,可以在评论区分享自己的实现方式。
2.paq更贴合Node.js开发习惯
const PAQ = require('priority-async-queue');
const paq = new PAQ();
// 链式调用结构
paq.addTask(() => {
console.log('one');
}).addTask(() => {
console.log('two');
}).addTask(() => {
console.log('three');
});
// one
// two
// three
// 支持原生async和promise等异步操作
paq.addTask(() => {
return new Promise(resolve => {
paq.sleep(1000).then(() => {
console.log('sleep 1s');
resolve();
});
});
});
paq.addTask(async () => {
await paq.sleep(1000).then(() => {
console.log('sleep 1s too');
});
});
// sleep 1s
// sleep 1s too
3.使用灵活
只要paq
设置的并发数足够大,或者和处理业务峰值相当,那么它就能近似Promise.all
那样无限制并发执行,但是paq
不会等所有任务都完成后才进行下一步操作。
const PAQ = require('priority-async-queue');
// 并发上限设置足够大
const paq = new PAQ(20);
const p1 = () => paq.sleep(1000).then(() => Promise.resolve('p1'));
const p2 = () => paq.sleep(1000).then(() => Promise.resolve('p2'));
const p3 = () => paq.sleep(1000).then(() => Promise.resolve('p3'));
paq.addTask(p1).addTask(p2).addTask(p3).on('completed', (opt, result) => {
console.log(result);
});
Promise.all([p1(), p2(), p3()]).then(res => {
console.log(res);
});
// p1
// p2
// p3
// [ 'p1', 'p2', 'p3' ]
如果paq
只处理第一个返回状态的任务,则它的用法接近Promise.race
的用法了。
const PAQ = require('priority-async-queue');
// 并发上限设置足够大
const paq = new PAQ(100);
const p1 = () => paq.sleep(3000).then(() => Promise.resolve('p1'));
const p2 = () => paq.sleep(2000).then(() => Promise.resolve('p2'));
const p3 = () => paq.sleep(1000).then(() => Promise.resolve('p3'));
let isFirst = false;
paq.addTask(p1).addTask(p2).addTask(p3).on('completed', (opt, result) => {
if (!isFirst) {
// TODO 只处理第一个改变状态的任务
console.log('paq: ' + result);
isFirst = true;
}
});
Promise.race([p1(), p2(), p3()]).then(res => {
console.log('race: ' + res);
});
// paq: p3
// race: p3
paq
近似Promise.allSettled
和Promise.any
的用法我就不再展开了。个人认为,日常开发中能用原生实现的,尽量用原生实现,本文只是起介绍作用,不构成使用建议。
学海无涯,剑圣说过:“真正的大师永远都怀着一颗学徒的心”,所以被人称作:“易大师”。我们这些普通人如果坚持学习,虽然最后可能成不了大师,但起码不会摆烂吧?
转载自:https://juejin.cn/post/7074060850448826399