多进程打包:thread-loader 源码(12)
码字不易,感谢阅读,你的点赞让人振奋!如文章有误请移步评论区告知,万分感谢!未经授权不得转载!
一、前情回顾
上一篇小左文详细讨论了 worker.js
中的 readNextMessage
方法的结构和部分实现逻辑:
-
讨论
PoolWorker.prototyoe.readNextMessage
和 这里的readNextMessage
的区别和联系; -
readNextMessage
都是调用readBuffer
先读长度,再读长度对应的数据buffer
,得到数据后交给对应的方法处理.;
今天这篇小作文接上文,继续讨论 worker.js
处理数据的方法—— onMessage
二、onMessage 的调用
结合前文的分析,onMessage
是被 readNextMessage
得到数据后的回调调用的。代码如下:
function readNextMessage() {
readBuffer(readPipe, 4, (lengthReadError, lengthBuffer) => {
const length = lengthBuffer.length && lengthBuffer.readInt32BE(0);
readBuffer(readPipe, length, (messageError, messageBuffer) => {
const messageString = messageBuffer.toString('utf-8');
const message = JSON.parse(messageString, reviver);
// 交给 onMessage 处理
onMessage(message);
setImmediate(() => readNextMessage());
});
});
}
从这里可以很清晰的看出,onMessage
接收到的参数是一个经过 parse
后的对象,这个对象来自进程间通信,是父进程写入到 writePipe
的。
三、onMessage 方法细节
-
方法位置:
thread-loader/src/worker.js -> function onMessage
-
方法参数:
message
,消息对象,通过上面调用的分析可知message
是来自父进程的消息对象; -
方法作用:处理来自父进程的消息,消息分为三种明确的类型,具体如下:
-
3.1
type = job
,job
类型表示任务,这些任务就是跑loaders
,跑loaders
的工作是交给queue
的worker
函数(区分worker
函数和worker.js
,worker
函数专指传递个asyncQueue
创建queue
的这个回调函数)做的,push
后queue
会自动调用worker
,这个过程下面详述; -
3.2
type = result
,result
类型表示结果,这些结果来自父进程,是子进程委托父进程调用某些方法后得到的结果,下面会详细讲解这个过程; -
3.3
type = warmup
,warmup
类型表示预热,所谓预热就是把需要的模块提前通过require
提前加载好,之所以这么做事为了减少进程间通信带来的开销;
-
function onMessage(message) {
try {
// id 是一个重要的标识,确保处理结果给到正确的位置
const { type, id } = message;
switch (type) {
// job 类型,是跑 loaders
case 'job': {
// push 到 queue,queue 的 worker 函数就工作了
queue.push(message);
break;
}
// result 类型是接收结果的
// 结果来自父进程
case 'result': {
const { error, result } = message;
// 得到结果后调用需要这个结果的回调函数继续后续工作
// 一般都是 loader 需要调用某些 webpack 跑 loader 时
// 提供的 data 对象或者 loaderContext 上的方法时
const callback = callbackMap[id];
if (callback) {
const nativeError = toNativeError(error);
// 调用 callback 把结果传给有需要的人
callback(nativeError, result);
} else {
console.error(`Worker got unexpected result id ${id}`);
}
// 移除这个 id 对应的 callback
delete callbackMap[id];
break;
}
// 预热
case 'warmup': {
// 获取需要预热的模块
const { requires } = message;
// 加载模块到进程中,调用 require 就可以了
// 只要加载过一次,后面再 require 就不会在重新加载了
requires.forEach((r) => require(r));
break;
}
default: {
console.error(`Worker got unexpected job type ${type}`);
break;
}
}
} catch (e) {
console.error(`Error in worker ${e}`);
}
}
3.1 type = job 细节
3.1.1 为什么是 queue
onMessage
把收到的 message
任务加入到 queue
队列就好了,为啥如此简单,跑 loader
的逻辑何在?
之所这么就好了,是因为这个 queue
和前面的 WorkerPool
的 asyncQueue
一样,都来自 neo-async/queue.js
,创建 queue
时传入了 worker
函数,当向 queue
中 push
内容时,neo-async
就会触发 runQueue
调用 worker
函数处理 push
进来的 data
,并且这个过程还是并发的。
3.1.2 创建 queue
调用 neo-async/queue.js
的 asyncQueue
传入 worker
函数和并发数就得到了 queue
。这里先说明一下,worker
函数就是下面的这个调用 loaderRunner.runLoaders
方法的回调函数,有别于 worker.js
这个文件。thread-loader
中的这些命名真的是绝了,都叫做 worker
,这个绝对是槽点。
当调用 queue.push
时,neo-async
会自动运行 runQueue
调用 worker
函数来处理 push
进来的数据,并且这个过程还是并发的,并发的控制是 worker
函数接收的第二个参数,在这里就是 taskCallback
。
都说 thread-loader
是并发多进程运行 loader
,写了这么久,读了这么久终于见到跑 loaders
的地方了。我们把配置在 thread-loader
后面的 loader
都交给子进程并发跑,子进程的实现靠的是 child_process.spawn
执行 worker.js
,并发依靠 neo-async/queue.js
。
关于 worker
函数我们下面再说,先不展开了。
import asyncQueue from 'neo-async/queue';
const queue = asyncQueue(({ id, data }, taskCallback) => {
// 这个函数就是 queue 的 worker 函数
// taskCallback 就是 runQueue 的 done 方法
try {
// 调用 loaderRunner 跑 loader
loaderRunner.runLoaders(
{
loaders: data.loaders,
context: { // 子进程自己造的 loaderContext 对象
loadModule () => {
writeJson({
type: 'job'
})
}
}
},
(err, lrResult) => {
// 运行 loader 得到的结果
const {
result,
cacheable,
fileDependencies,
contextDependencies,
missingDependencies,
} = lrResult;
// 通过进程通信把结果发送给父进程
writeJson({
type: 'job',
id,
error: err && toErrorObj(err),
result: { /* 结果 */ },
data: buffersToSend.map((buffer) => buffer.length),
});
// ...
}
);
} catch (e) {
}, PARALLEL_JOBS);
3.2 result 细节
为什么设计 result
类型呢?
前文其实也简单描述过,是因为进程间通信传递的都是被序列化的 JSON
字符,这就导致很多运行 loader
需要的的方法不能直接从父进程传递到子进程,那怎么办呢?
牛批的设计就是如此的精简,只需要在子进程中写一些同名的方法,这些方法传给 loader
们。这些同名方法则通过进程通信管道让父进程去调用这些真正的方法,待调用结束后父进程再通过进程通信的方式发送 type = result
的 message
过来,此时 message
携带了真正方法的处理结果。
以上面 worker
函数调用 loaderRunner.runLoaders
传入的 context
中的中 loadModule
为例,这个方法是定义在父进程 webpack
运行 loader
时创建的 loaderContext
对象的,但是进程通信时无法把 loadModule
方法一并传递给子进程。
当子进程运行某个 loader
,例如 a-loader
,子进程自己造了一个 loaderContext
对象,这上面也有 loadModule
,只不过这方法不是真的去加载模块,而是把这个加载模块的诉求通过进程通信告知父进程,父进程去实现子进程的愿望——加载模块。
这也正是为什么 thread-loader
不能调用自定义到 loaderContext
的方法的原因。是因为 thread-loader
没有支持获取自定义 loaderContext
方法的出口,当然我读这个 thread-loader
源码的初衷就是解决这个问题。
这个内容也展开了,下面我们会讨论 queue
的 worker
函数,其中包含了这些方法
四、总结
这篇小作文详细讨论了 onMessage
的代码结构和其中代码的意义,另外还讨论了其中的细节问题,具体如下:
-
onMessage
分为三种类型的消息:- 1.1 代表任务的
job
,任务push
到queue
,叫queue
的worker
忙去吧! - 1.2 代表结果的
result
,接收父进程的返回结果,把这些结果交给有需要的回调; - 1.3 代表预热的
warmup
,把需要的模块加载到子进程,节约时间;
- 1.1 代表任务的
-
回顾了
neo-async/queue.js
的queue
发挥作用的原理,创建queue
时传入的worker
函数负责跑loader
,worker
函数细节并未展开讨论; -
type
为result
是进程间通信的一环,并介绍了因为进程间通信的限制,导致方法不能传递,所以子进程中的loaders
需要了,就让父进程去调用,完事儿再通过回调给到子进程
下一篇,我们正式讲解跑 loader
的 worker
函数中的各种细节部分,这里面会把 type
为 result 和
job` 的内容再次充实。
转载自:https://juejin.cn/post/7107903438901641229