likes
comments
collection
share

Node.js使用async_hooks实现线程池(下)

作者站长头像
站长
· 阅读数 42

「这是我参与2022首次更文挑战的第4天,活动详情查看:2022首次更文挑战」。

前言

上一篇文章讲解了async_hook提供的API以及作用,本文主要介绍如何利用async_hooks实现线程池。

先决条件

阅读并食用本文,需要先具备:

  • 掌握 JavaScript 同步和异步编程的基础知识
  • 掌握 Node.js 的工作原理
  • 了解 worker_threads API的基本原理
  • 了解 async_hook API的基本原理

简单例子

回顾worker_threads一文中最后的例子🤔,利用worker_threads衍生工作线程去运算运算斐波那契数列

|-main.js
|-worker.js

main.js 主线程脚本

衍生工作线程,使用./worker.js作为工作线程脚本

const {Worker} = require("worker_threads");
let number = 10;

const worker = new Worker("./worker.js", {workerData: {num: number}});

worker.once("message", result => {
    console.log(`${number}th Fibonacci Result: ${result}`);
});

worker.on("error", error => {
    console.log(error);
});

worker.on("exit", exitCode => {
    console.log(`It exited with code ${exitCode}`);
})

console.log("Execution in main thread");

worker.js工作线程脚本

斐波那契数列运算

const {parentPort, workerData} = require("worker_threads");

parentPort.postMessage(getFibonacciNumber(workerData.num))

function getFibonacciNumber(num) {
    if (num === 0) {
        return 0;
    }
    else if (num === 1) {
        return 1;
    }
    else {
        return getFibonacciNumber(num - 1) + getFibonacciNumber(num - 2);
    }
}

执行:

node main.js

输出:

Execution in main thread
10th Fibonacci Result: 55
It exited with code 0

Execution in main thread先输出说明了工作线程并不会阻塞主线程执行。

接下来开始正文 😄

main.js引入async_hook异步资源使用情况,改动后内容如下:

顶部新增async_hook代码片段。以及console.log都替换成fs.writeSync。因为console.log是异步实现,会增加async_hook的监控负担。

const {Worker} = require("worker_threads");
/** --- --- --- 以下是新增内容 --- --- --- **/
const async_hooks = require('async_hooks')
const fs = require('fs')

async_hooks.createHook({
    init (asyncId, type, triggerAsyncId, resource) {
      fs.writeSync(1, `init ${type}(${asyncId}): trigger: ${triggerAsyncId}\n`, resource)
    },
    destroy (asyncId) {
      fs.writeSync(1, `destroy: ${asyncId}\n`);
    }
  }).enable()
/** --- --- --- 新增内容结束 --- --- --- **/  

let number = 10;

const worker = new Worker("./worker.js", {workerData: {num: number}});

worker.once("message", result => {
    fs.writeSync(1, `${number}th Fibonacci Result: ${result}\n`);
});

worker.on("error", error => {
    fs.writeSync(1, error);
});

worker.on("exit", exitCode => {
    fs.writeSync(1, `It exited with code ${exitCode}\n`);
})

console.log("Execution in main thread");

然后再执行:

node main.js

输出:

init WORKER(2): trigger: 1
init MESSAGEPORT(3): trigger: 1
init TTYWRAP(4): trigger: 1
init SIGNALWRAP(5): trigger: 1
init TickObject(6): trigger: 1
init TTYWRAP(7): trigger: 1
init TickObject(8): trigger: 1
init MESSAGEPORT(9): trigger: 1
init MESSAGEPORT(10): trigger: 1
Execution in main threaddestroy: 6
destroy: 8
destroy: 10
10th Fibonacci Result: 55
destroy: 9
destroy: 3
init TickObject(11): trigger: 2
init TickObject(12): trigger: 2
It exited with code 0
init TickObject(13): trigger: 11
init TickObject(14): trigger: 12
destroy: 2
destroy: 11
destroy: 12
destroy: 13
destroy: 14

分析控制台输出:

执行上下文资源序号资源类型描述
1(主线程顶级)2WORKER创建工作线程。
1(主线程顶级)3MESSAGEPORT创建通讯端口,双向通讯通道的一端。
1(主线程顶级)4TTYWRAP创建终端文本包装
1(主线程顶级)5SIGNALWRAP创建信令包装
1(主线程顶级)6TickObject创建异步回调对象
1(主线程顶级)7TTYWRAP创建终端文本包装
1(主线程顶级)8TickObject创建异步回调对象
1(主线程顶级)9MESSAGEPORT创建通讯端口,双向通讯通道的一端。
1(主线程顶级)10MESSAGEPORT创建通讯端口,双向通讯通道的一端。
8TickObject注销
10MESSAGEPORT注销
9MESSAGEPORT注销
3MESSAGEPORT注销
2(工作线程顶级)11TickObject创建异步回调对象
2(工作线程顶级)12TickObject创建异步回调对象
11(工作线程TickObject)13TickObject创建异步回调对象
12(工作线程TickObject)14TickObject创建异步回调对象
2WORKER注销
11TickObject注销
12TickObject注销
13TickObject注销
14TickObject注销

主线程衍生出工作线程后,会先实例化与工作线程通信相关的对象(发送接收)。工作线程运算结果后,调用parentPort.postMessage发送消息,主线程接收到消息,打印结果后,异步回调资源注销,随后通信资源注销。接着工作线程资源注销,工作线程创建点异步资源也接着注销。

线程池实现

了解async_hooksworker_threads的化学反应后,我们接下来通过官方例子认识async_hooks模块下的AsyncResource类以及实现线程池。

先上代码,目录结构:

|-main.js // 主线程脚本
|-task_processor.js // 工作线程脚本
|-worker_pool.js // 线程池实现

main.js

主线程,根据当前宿主cpu核数,去实例化线程池。然后执行16个任务:

const WorkerPool = require('./worker_pool.js');
const os = require('os');

const pool = new WorkerPool(os.cpus().length);
console.log(`cpus length:`, os.cpus().length)
let finished = 0;
for (let i = 0; i < 16; i++) {
  pool.runTask({ a: 100, b: i }, (err, result) => {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}

task_processor.js

工作线程,计算主线程传过来的对象,计算对象属性相加,并把结果发回给父线程(主线程):

const { parentPort } = require('worker_threads');
parentPort.on('message', (task) => {
  parentPort.postMessage(task.a + task.b);
});

worker_pool.js

const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
const { Worker } = require('worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.
  }
}

class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super();
    this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();

    // 每当发出 kWorkerFreedEvent 时,调度队列中待处理的下一个任务(如果有)。
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift();
        this.runTask(task, callback);
      }
    });
  }

  addNewWorker() {
    const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
    worker.on('message', (result) => {
      // 如果成功:调用传递给`runTask`的回调,删除与Worker关联的`TaskInfo`,并再次将其标记为空闲。
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error', (err) => {
      // 如果发生未捕获的异常:调用传递给 `runTask` 并出现错误的回调。
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // 从列表中删除 Worker 并启动一个新的 Worker 来替换当前的 Worker。
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // 没有空闲线程,等待工作线程空闲。
      this.tasks.push({ task, callback });
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate();
  }
}

module.exports = WorkerPool;

执行:

node main.js

输出:

cpus length: 8
3 null 103
8 null 108
9 null 109
10 null 110
11 null 111
12 null 112
13 null 113
14 null 114
15 null 115
4 null 104
2 null 102
1 null 101
7 null 107
6 null 106

当前宿主cpu核数8,实例化线程池。后续执行16个任务,并返回上述计算结果。

先观察WorkerPool类,构造函数根据CPU核数入参,for循环调用addNewWorker

addNewWorker函数把Worker实例push到自身属性workers数组和freeWorkers数组中。WorkerPool自身属性freeWorkerstasks,分别是存放空闲的工作线程,和存放需要执行的任务回调函数的任务队列。

runTask函数会先判断有没空闲的线程freeWorkers,如没空闲线程会把任务放进任务队列tasks,若有空闲线程则出列一个工作线程对象,并对线程对象属性赋值WorkerPoolTaskInfo实例,并向工作线程发送任务信息,来达到执行任务的目的。

接下来观察WorkerPoolTaskInfo类,WorkerPoolTaskInfo类继承了async_hooks模块中的AsyncResource类。主要用到其内部函数runInAsyncScope去执行任务,以及内部函数emitDestroy触发destroy hook。

runInAsyncScope在异步资源的执行上下文中使用提供的参数调用提供的函数。这将建立上下文,在回调之前触发 AsyncHooks,调用函数,在回调之后触发 AsyncHooks,然后恢复原始执行上下文。

async_hooks调用所有destroy的钩子。

任务执行完后,使用过的worker对象会重新回到freeWorkers数组,然后判断任务队列tasks是否还有任务,若有则调用runTask函数重复执行任务流程,直到所有任务执行完毕。

总结

使用线程池,可避免后续线程实例化的开销,重复利用线程实例。与此同时使用任务队列,以及相对应的任务处理策略,能大大提高项目处理任务的并发数量。

转载自:https://juejin.cn/post/7065701050186989605
评论
请登录