likes
comments
collection
share

基于Node.js异步优先队列2.0来袭

作者站长头像
站长
· 阅读数 19

前言

阔别Web前端3年多时间里面,今天我重新抽空整理了一下之前的一个开源库:priority-async-queue,一个基于Node.js的异步优先任务队列。不了解1.0版本的同学可以戳这里,我们今天主要围绕着paq的设计初衷和适用场景展开来说。

相比1.0,2.0改进了什么?

改进的主要有三点:

  • 1.0只支持单并发状态执行任务队列,2.0则加入了支持多并发执行状态,具体并发数,用户可以自行设置。
  • 2.0为每个任务加入了执行时间的统计,具体包括任务创建时间creatTime,任务开始执行时间startTime,和任务执行结束时间endTime。用户可以自己根据需求计算所需的任务等待时间:startTime - createTime,或者任务执行消耗的时间endTime - startTime,又或者只是用来写入日志文件,记录执行时间相关信息。
  • 1.0只支持在Node.js环境下执行,2.0则提供支持浏览器可执行的输出文件。让浏览器端遇到同样困扰的同学,能够迎刃而解。

paq设计思路

paq设计思路其实非常简单,一共就4个类组成:

  • Task 是描述每个待执行(异步/同步)任务的执行逻辑以及配置参数。
  • PriorityQueue 是控制每个待执行(异步/同步)任务的优先级队列、具有队列的基本属性和作。
  • AsyncQueue 是控制每个待执行(异步/同步)任务能严格一定顺序地执行的队列。
  • Event 模拟事件监听和事件触发的类。

2.0为了兼容浏览器环境能正常运行,去掉了对Node.js原生事件类EventEmitter的依赖,自己实现了简易的事件绑定和触发的功能。

下面是paq 2.0的程序流程图:

基于Node.js异步优先队列2.0来袭

paq的设计初衷

我刚转岗到游戏开发的时候,部门迫切需要一个集群打包系统来处理庞大的打包业务。当时,我临危受命,接下了这个任务。后来,我开发的集群打包系统,其打包采用的任务调度,最核心底层代码架构近似于paq。当然,实际应用会比paq复杂很多,因为游戏打包流程是一个极其复杂而繁琐的过程,我只是抽离了最核心通用的调度思路来开源成一个通用库。

试想,如果一台打包机只能在同一时间执行单个打包任务,那么就太浪费硬件资源了。但受限于CPU核数、硬盘空间、内存容量、数据读写速度等因素,我们又不能粗暴地向打包机里加入并发执行的打包任务,所以这个时候,能控制好每台打包机的并发数显得尤为重要。既要保证效率,也要保证安全可靠。

paq适用场景

首先,我们必须明确的是,在绝大部分业务场景里面,你可能不需要paq。没设置并发数的paq,其任务默认是严格按照顺序执行,并发数始终维持为1,这在绝大部份情况已经降低执行效率。JavaScript原生支持的异步任务和事件循环,本来就是要充分发挥在单线程执行环境下,最大限度利用CPU多核的特性,从而提高程序执行效率。

可能不少同学都已经知道,在浏览器端,最大并发请求数,每个浏览器厂商都做了一定的限制,如:Chrome允许的最大并发请求数目为6,FireFox是4,每个浏览器版本之间又会存在一定的差异。总而言之,主流浏览器在网络请求方面已经帮我们做好了负载均衡的工作了。而在Node.js环境下,负载均衡问题则需要我们开发者自己来解决。

如果,在短时间内,一大批客户端产生大量的网络请求时候,服务器的承受能力肯定是有限的。这个时候,需要我们用一个像队列的数据结构容器来先存好这些请求,然后按照先进先出的原则来慢慢提供给服务端处理,压力会减少很多。说到这里,很多有服务端经验的同学,第一时间就会想到消息队列。没错,paq很像消息队列,但它没有遵守生产者消费者模式。所以paq不能单独处理分布式和集群业务的调度,它更适合放在MQ的下游。

paq特点

1.paq更小、更易用。

paq有效源码大概200行左右,Node.js环境下是非常精小的。但在浏览器端,打包压缩后的paq也有18KB,主要来自是ES6语法兼容性代码的冗余。

做前端开发者,无论是Web、移动端原生和游戏开发,最折磨的莫过于要兼容各种用户终端运行的环境和设备。

下面是paq最基础的用法,开箱即用:

const PAQ = require('priority-async-queue');
const paq = new PAQ();

paq.addTask(() => {
  console.log('Helo World!');
});

// Hello World!

接着,我们来看看字节的一道经典面试题。

class Scheduler {
  add(promiseCreator) {
    // 完善Scheduler,使其并发数为2
  }
}

const timeout = (time) => new Promise(resolve => {
  setTimeout(resolve, time);
})

const scheduler = new Scheduler();

const addTask = (time, order) => {
  scheduler.add(() => timeout(time)).then(() => console.log(order));
}

addTask(1000, 1);
addTask(500, 2);
addTask(300, 3);
addTask(400, 4);

// 要求输出顺序
// 2
// 3
// 1
// 4

大家可以稍加思考一下,怎么扩展 Scheduler 类能完成需求。如果见过或者已经知道怎么做的同学不妨看看用paq怎么轻松实现这个需求。

const PAQ = require('priority-async-queue');
// 实例化paq时,使其并发数为2
const paq = new PAQ(2);

class Scheduler {
  add(promiseCreator) {
    return new Promise(resolve => {
      paq.addTask({
        completed: (ctx, res) => {
          resolve(res);
        }
      }, () => promiseCreator());
    });
  }
}

...

至于,不借助paq又怎么实现这个需求呢?有兴趣的同学,可以在评论区分享自己的实现方式。

2.paq更贴合Node.js开发习惯

const PAQ = require('priority-async-queue');
const paq = new PAQ();

// 链式调用结构
paq.addTask(() => {
  console.log('one');
}).addTask(() => {
  console.log('two');
}).addTask(() => {
  console.log('three');
});

// one
// two
// three

// 支持原生async和promise等异步操作
paq.addTask(() => {
  return new Promise(resolve => {
    paq.sleep(1000).then(() => {
      console.log('sleep 1s');
      resolve();
    });
  });
});
paq.addTask(async () => {
  await paq.sleep(1000).then(() => {
    console.log('sleep 1s too');
  });
});

// sleep 1s
// sleep 1s too

3.使用灵活

只要paq设置的并发数足够大,或者和处理业务峰值相当,那么它就能近似Promise.all那样无限制并发执行,但是paq不会等所有任务都完成后才进行下一步操作。

const PAQ = require('priority-async-queue');
// 并发上限设置足够大
const paq = new PAQ(20);

const p1 = () => paq.sleep(1000).then(() => Promise.resolve('p1'));
const p2 = () => paq.sleep(1000).then(() => Promise.resolve('p2'));
const p3 = () => paq.sleep(1000).then(() => Promise.resolve('p3'));

paq.addTask(p1).addTask(p2).addTask(p3).on('completed', (opt, result) => {
  console.log(result);
});

Promise.all([p1(), p2(), p3()]).then(res => {
  console.log(res);
});

// p1
// p2
// p3
// [ 'p1', 'p2', 'p3' ]

如果paq只处理第一个返回状态的任务,则它的用法接近Promise.race的用法了。

const PAQ = require('priority-async-queue');
// 并发上限设置足够大
const paq = new PAQ(100);

const p1 = () => paq.sleep(3000).then(() => Promise.resolve('p1'));
const p2 = () => paq.sleep(2000).then(() => Promise.resolve('p2'));
const p3 = () => paq.sleep(1000).then(() => Promise.resolve('p3'));

let isFirst = false;
paq.addTask(p1).addTask(p2).addTask(p3).on('completed', (opt, result) => {
  if (!isFirst) {
    // TODO 只处理第一个改变状态的任务
    console.log('paq: ' + result);
    isFirst = true;
  }
});

Promise.race([p1(), p2(), p3()]).then(res => {
  console.log('race: ' + res);
});

// paq: p3
// race: p3

paq近似Promise.allSettledPromise.any的用法我就不再展开了。个人认为,日常开发中能用原生实现的,尽量用原生实现,本文只是起介绍作用,不构成使用建议。

学海无涯,剑圣说过:“真正的大师永远都怀着一颗学徒的心”,所以被人称作:“易大师”。我们这些普通人如果坚持学习,虽然最后可能成不了大师,但起码不会摆烂吧?