Node.js使用async_hooks实现线程池(下)
「这是我参与2022首次更文挑战的第4天,活动详情查看:2022首次更文挑战」。
前言
接上一篇文章讲解了async_hook
提供的API以及作用,本文主要介绍如何利用async_hooks
实现线程池。
先决条件
阅读并食用本文,需要先具备:
- 掌握 JavaScript 同步和异步编程的基础知识
- 掌握 Node.js 的工作原理
- 了解 worker_threads API的基本原理
- 了解 async_hook API的基本原理
简单例子
回顾worker_threads一文中最后的例子🤔,利用worker_threads
衍生工作线程去运算运算斐波那契数列
。
|-main.js
|-worker.js
main.js
主线程脚本
衍生工作线程,使用./worker.js
作为工作线程脚本
const {Worker} = require("worker_threads");
let number = 10;
const worker = new Worker("./worker.js", {workerData: {num: number}});
worker.once("message", result => {
console.log(`${number}th Fibonacci Result: ${result}`);
});
worker.on("error", error => {
console.log(error);
});
worker.on("exit", exitCode => {
console.log(`It exited with code ${exitCode}`);
})
console.log("Execution in main thread");
worker.js
工作线程脚本
斐波那契数列运算
const {parentPort, workerData} = require("worker_threads");
parentPort.postMessage(getFibonacciNumber(workerData.num))
function getFibonacciNumber(num) {
if (num === 0) {
return 0;
}
else if (num === 1) {
return 1;
}
else {
return getFibonacciNumber(num - 1) + getFibonacciNumber(num - 2);
}
}
执行:
node main.js
输出:
Execution in main thread
10th Fibonacci Result: 55
It exited with code 0
Execution in main thread
先输出说明了工作线程并不会阻塞主线程执行。
接下来开始正文 😄
在main.js
引入async_hook
异步资源使用情况,改动后内容如下:
顶部新增async_hook
代码片段。以及console.log
都替换成fs.writeSync
。因为console.log
是异步实现,会增加async_hook
的监控负担。
const {Worker} = require("worker_threads");
/** --- --- --- 以下是新增内容 --- --- --- **/
const async_hooks = require('async_hooks')
const fs = require('fs')
async_hooks.createHook({
init (asyncId, type, triggerAsyncId, resource) {
fs.writeSync(1, `init ${type}(${asyncId}): trigger: ${triggerAsyncId}\n`, resource)
},
destroy (asyncId) {
fs.writeSync(1, `destroy: ${asyncId}\n`);
}
}).enable()
/** --- --- --- 新增内容结束 --- --- --- **/
let number = 10;
const worker = new Worker("./worker.js", {workerData: {num: number}});
worker.once("message", result => {
fs.writeSync(1, `${number}th Fibonacci Result: ${result}\n`);
});
worker.on("error", error => {
fs.writeSync(1, error);
});
worker.on("exit", exitCode => {
fs.writeSync(1, `It exited with code ${exitCode}\n`);
})
console.log("Execution in main thread");
然后再执行:
node main.js
输出:
init WORKER(2): trigger: 1
init MESSAGEPORT(3): trigger: 1
init TTYWRAP(4): trigger: 1
init SIGNALWRAP(5): trigger: 1
init TickObject(6): trigger: 1
init TTYWRAP(7): trigger: 1
init TickObject(8): trigger: 1
init MESSAGEPORT(9): trigger: 1
init MESSAGEPORT(10): trigger: 1
Execution in main threaddestroy: 6
destroy: 8
destroy: 10
10th Fibonacci Result: 55
destroy: 9
destroy: 3
init TickObject(11): trigger: 2
init TickObject(12): trigger: 2
It exited with code 0
init TickObject(13): trigger: 11
init TickObject(14): trigger: 12
destroy: 2
destroy: 11
destroy: 12
destroy: 13
destroy: 14
分析控制台输出:
执行上下文 | 资源序号 | 资源类型 | 描述 |
---|---|---|---|
1(主线程顶级) | 2 | WORKER | 创建工作线程。 |
1(主线程顶级) | 3 | MESSAGEPORT | 创建通讯端口,双向通讯通道的一端。 |
1(主线程顶级) | 4 | TTYWRAP | 创建终端文本包装 |
1(主线程顶级) | 5 | SIGNALWRAP | 创建信令包装 |
1(主线程顶级) | 6 | TickObject | 创建异步回调对象 |
1(主线程顶级) | 7 | TTYWRAP | 创建终端文本包装 |
1(主线程顶级) | 8 | TickObject | 创建异步回调对象 |
1(主线程顶级) | 9 | MESSAGEPORT | 创建通讯端口,双向通讯通道的一端。 |
1(主线程顶级) | 10 | MESSAGEPORT | 创建通讯端口,双向通讯通道的一端。 |
8 | TickObject | 注销 | |
10 | MESSAGEPORT | 注销 | |
9 | MESSAGEPORT | 注销 | |
3 | MESSAGEPORT | 注销 | |
2(工作线程顶级) | 11 | TickObject | 创建异步回调对象 |
2(工作线程顶级) | 12 | TickObject | 创建异步回调对象 |
11(工作线程TickObject) | 13 | TickObject | 创建异步回调对象 |
12(工作线程TickObject) | 14 | TickObject | 创建异步回调对象 |
2 | WORKER | 注销 | |
11 | TickObject | 注销 | |
12 | TickObject | 注销 | |
13 | TickObject | 注销 | |
14 | TickObject | 注销 |
主线程
衍生出工作线程
后,会先实例化与工作线程通信相关的对象(发送
与接收
)。工作线程
运算结果后,调用parentPort.postMessage
发送消息,主线程
接收到消息,打印结果后,异步回调资源
注销,随后通信资源
注销。接着工作线程
资源注销,工作线程创建点异步资源
也接着注销。
线程池实现
了解async_hooks
与worker_threads
的化学反应后,我们接下来通过官方例子认识async_hooks模块下的AsyncResource
类以及实现线程池。
先上代码,目录结构:
|-main.js // 主线程脚本
|-task_processor.js // 工作线程脚本
|-worker_pool.js // 线程池实现
main.js
主线程,根据当前宿主cpu核数,去实例化线程池。然后执行16个任务:
const WorkerPool = require('./worker_pool.js');
const os = require('os');
const pool = new WorkerPool(os.cpus().length);
console.log(`cpus length:`, os.cpus().length)
let finished = 0;
for (let i = 0; i < 16; i++) {
pool.runTask({ a: 100, b: i }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
task_processor.js
工作线程,计算主线程传过来的对象,计算对象属性相加,并把结果发回给父线程(主线程):
const { parentPort } = require('worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
worker_pool.js
const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
const { Worker } = require('worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// 每当发出 kWorkerFreedEvent 时,调度队列中待处理的下一个任务(如果有)。
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// 如果成功:调用传递给`runTask`的回调,删除与Worker关联的`TaskInfo`,并再次将其标记为空闲。
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// 如果发生未捕获的异常:调用传递给 `runTask` 并出现错误的回调。
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// 从列表中删除 Worker 并启动一个新的 Worker 来替换当前的 Worker。
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// 没有空闲线程,等待工作线程空闲。
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
module.exports = WorkerPool;
执行:
node main.js
输出:
cpus length: 8
3 null 103
8 null 108
9 null 109
10 null 110
11 null 111
12 null 112
13 null 113
14 null 114
15 null 115
4 null 104
2 null 102
1 null 101
7 null 107
6 null 106
当前宿主cpu核数8,实例化线程池。后续执行16个任务,并返回上述计算结果。
先观察WorkerPool
类,构造函数根据CPU核数入参,for
循环调用addNewWorker
。
addNewWorker
函数把Worker
实例push
到自身属性workers
数组和freeWorkers
数组中。WorkerPool
自身属性freeWorkers
和tasks
,分别是存放空闲的工作线程,和存放需要执行的任务回调函数的任务队列。
runTask
函数会先判断有没空闲的线程freeWorkers
,如没空闲线程会把任务放进任务队列tasks
,若有空闲线程则出列一个工作线程对象,并对线程对象属性赋值WorkerPoolTaskInfo
实例,并向工作线程发送任务信息,来达到执行任务的目的。
接下来观察WorkerPoolTaskInfo
类,WorkerPoolTaskInfo
类继承了async_hooks
模块中的AsyncResource
类。主要用到其内部函数runInAsyncScope
去执行任务,以及内部函数emitDestroy
触发destroy hook。
runInAsyncScope
在异步资源的执行上下文中使用提供的参数调用提供的函数。这将建立上下文,在回调之前触发 AsyncHooks,调用函数,在回调之后触发 AsyncHooks,然后恢复原始执行上下文。
async_hooks
调用所有destroy
的钩子。
任务执行完后,使用过的worker对象会重新回到freeWorkers
数组,然后判断任务队列tasks
是否还有任务,若有则调用runTask
函数重复执行任务流程,直到所有任务执行完毕。
总结
使用线程池,可避免后续线程实例化的开销,重复利用线程实例。与此同时使用任务队列,以及相对应的任务处理策略,能大大提高项目处理任务的并发数量。
转载自:https://juejin.cn/post/7065701050186989605