Node.js使用async_hooks实现线程池(上)
「这是我参与2022首次更文挑战的第3天,活动详情查看:2022首次更文挑战」。
前言
上一篇文章介绍使用worker_threads
模块,衍生工作线程来解决Node.js主线程处理CPU密集型任务时的阻塞问题。
在实际操作中,使用工作线程时应该使用线程池
。否则,创建工作线程的开销可能会超过其收益。
本文将结合官方文档分上下两篇章介绍async_hooks
模块、线程池
概念,并使用async_hooks
模块封装线程池
。
先决条件
阅读并食用本文,需要先具备:
- 掌握 JavaScript 同步和异步编程的基础知识
- 掌握 Node.js 的工作原理
async_hooks 介绍
async_hooks
模块提供了一系列 API 用来跟踪异步资源的生命周期。该模块最早出现在Node.js v8.0.0
版本。截至到目前在最新文档版本v16.14.0
和在最新的尝鲜版本v17.5.0
还是标记为Stability: 1 - Experimental
,还是未转正的试验性功能。但经历了多个版本的迭代,有理由相信async_hooks
已经被完善和塑造,比典型的实验性 API 更稳定。
官方API简述
const async_hooks = require('async_hooks');
// 返回当前执行上下文的 ID。
const eid = async_hooks.executionAsyncId();
// 返回负责触发当前执行范围回调的句柄ID。
const tid = async_hooks.triggerAsyncId();
// 创建一个新的 AsyncHook 实例。所有这些回调都是可选的。详情见下文。
const asyncHook =
async_hooks.createHook({ init, before, after, destroy, promiseResolve });
// 允许调用此 AsyncHook 实例的回调。 这不是运行构造函数后的隐式操作,必须显式运行才能开始执行回调。
asyncHook.enable();
// 禁用监听新的异步事件。
asyncHook.disable();
// 响应上文,以下是可以传递给 createHook() 的回调。
// init 在对象构造期间被调用。 此回调运行时资源可能尚未完成构造,因此“asyncId”引用的资源的所有字段可能尚未填充。
function init(asyncId, type, triggerAsyncId, resource) { }
// 在调用资源的回调之前调用 Before。 对于句柄(例如 TCPWrap),它可以被调用 0-N 次,而对于请求(例如 FSReqCallback),它将被准确地调用 1 次。
function before(asyncId) { }
// After 在资源的回调完成后调用。
function after(asyncId) { }
// 当资源被销毁时调用 Destroy。
function destroy(asyncId) { }
// 当调用传递给“Promise”构造函数的“resolve”函数时(直接或通过其他解决promise的方法),promise Resolve 仅对promise 资源调用。
function promiseResolve(asyncId) { }
举个简单例子
通过RELP去执行:setTimeout(()=>{}, 1000)
setTimeout(()=>{}, 1000)
看打印对象内容:
关注以下属性:
- _idleTimeout 超时时机值
- _onTimeout 回调函数
- _repeat 是否重复执行回掉
- _destroyed 是否已被销毁
setTimeout生命周期如下:
async_hooks
提供一系列的钩子来监视上述生命周期的不同阶段,我们可以将回调函数附加到这些挂钩上。有四种常见的钩子类型,如init
, before
, after
和 destroy
。它们在异步资源生命周期的以下四个阶段被触发。
通过示例代码了解
创建main.js内容如下:
const fs = require('fs')
const async_hooks = require('async_hooks')
async_hooks.createHook({
init (asyncId, type, triggerAsyncId, resource) {
fs.writeSync(1, `init ${type}(${asyncId}): trigger: ${triggerAsyncId}\n`, resource)
},
destroy (asyncId) {
fs.writeSync(1, `destroy: ${asyncId}\n`);
}
}).enable()
async function A () {
fs.writeSync(1, `A -> ${async_hooks.executionAsyncId()}\n`)
setTimeout(() => {
fs.writeSync(1, `A in setTimeout -> ${async_hooks.executionAsyncId()}\n`)
B()
})
}
async function B () {
fs.writeSync(1, `B -> ${async_hooks.executionAsyncId()}\n`)
setTimeout(() => {
fs.writeSync(1, `B in setTimeout -> ${async_hooks.executionAsyncId()}\n`)
})
}
fs.writeSync(1, `top level -> ${async_hooks.executionAsyncId()}\n`)
A()
执行:
node main.js
输出:
top level -> 1
init PROMISE(2): trigger: 1
A -> 1
init Timeout(3): trigger: 1
A in setTimeout -> 3
init PROMISE(4): trigger: 3
B -> 3
init Timeout(5): trigger: 3
destroy: 3
B in setTimeout -> 5
destroy: 5
代码开始使用async_hooks.createHook
注册init
回调跟踪所有异步资源的初始化、注册destroy
监听异步资源的销毁,并通过调用 .enable()
启用。
使用 fs.writeSync(1, msg)
打印到标准输出,writeSync
的第 1 个参数接收文件描述符,1
表示标准输出。为什么不使用 console.log
呢?因为 console.log
是一个异步操作,如果在 async_hooks.createHook
注册的回调函数中出现异步操作将会导致无限循环。
为了实现对异步资源的跟踪,Node.js
对每一个函数(不论异步还是同步)提供了一个 async scope,我们可以通过调用 async_hooks.executionAsyncId()
来获取函数当前的 async scope 的 id(称为 asyncId),通过调用 async_hooks.triggerAsyncId()
来获取当前函数调用者的 asyncId。
异步资源在创建时触发 init
事件回调函数,init
函数中的第 1 个参数代表该异步资源的 asyncId
,第 2 个参数 type
表示异步资源的类型(例如 TCPWRAP、PROMISE、Timeout、Immediate、TickObject 等等),第 3 个参数triggerAsyncId
表示该异步资源的调用者的 asyncId
。第 4 个参数resource
表示异步操作的资源的引用,需要在destroy
期间释放。
异步资源在销毁时触发 destroy
事件回调函数,该函数只接收一个参数,即该异步资源的 asyncId
。
线程池是什么
此处引用维基百科:
一种线程
使用模式。线程
过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池
维护着多个线程
,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池
不仅能够保证内核的充分利用,还能防止过分调度。可用线程
数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
async_hooks 封装线程池
先说说大概思路:
- 根据当前系统CPU数,利用
worker_threads
创建一定合适数量的工作线程
。 - 在主线程维护需要使用工作线程的
任务队列
。 - 利用
async_hooks
监控工作线程
的任务完成情况并调度工作线程的任务队列
。 此部分将会在接下来的文章推出。
期待《Node.js使用async_hooks实现线程池(下)》,也欢迎有兴趣的同学在评论区讨论。
转载自:https://juejin.cn/post/7063836300800950302