面试怕被问”异步队列“?记住 webpack 5 给出的标准答案!
1. 前文回顾
webpack compilation 中依靠四个队列实现的“递归依赖解析”。主要依靠 AsyncQueue 机制实现;
2. Compilation 的四个队列
Compilation 实现了个队列,按照亲代关系从上之下(顺序靠前则辈分更大)依次是:
- this.proceeDependenciesQueue
- this.addModuleQueue
- this.factorizeQueue
- this.buildQueue
3. 队列调度
这四个队列的关系:
/** @type {AsyncQueue<Module, Module, Module>} */
this.processDependenciesQueue = new AsyncQueue({
name: "processDependencies",
parallelism: options.parallelism || 100,
processor: this._processModuleDependencies.bind(this)
});
/** @type {AsyncQueue<Module, string, Module>} */
this.addModuleQueue = new AsyncQueue({
name: "addModule",
parent: this.processDependenciesQueue,
getKey: module => module.identifier(),
processor: this._addModule.bind(this)
});
/** @type {AsyncQueue<FactorizeModuleOptions, string, Module | ModuleFactoryResult>} */
this.factorizeQueue = new AsyncQueue({
name: "factorize",
parent: this.addModuleQueue,
processor: this._factorizeModule.bind(this)
});
/** @type {AsyncQueue<Module, Module, Module>} */
this.buildQueue = new AsyncQueue({
name: "build",
parent: this.factorizeQueue,
processor: this._buildModule.bind(this)
});
3.1 _root
this.processDependenciesQueue 是四队列的祖先队列,即 _root
;声明的队列中如果没有传入 parent 则该队列的 parent 是自己,如果传递了 parent 则队列属性取用 parent._root
。
根据上文中创建
class AsyncQueue {
constructor({ name, parallelism, parent, processor, getKey }) {
this._root = parent ? parent._root : this;
}
}
3.2 children
与_root
对应的概念是 children
, this.addModuleQueue、this.factoryQueue、this.buildQueue 就是 this.proceeDependenciesQueue 的 children。
这里有点需要注意的是,children 的概念并不是相对于 parent 的,而是 _root.children
。从下面的代码来看,children 是挂载到 _root
上,记住这一点,这一点是这个队列能够实现递归解析依赖的关键点。
class AsyncQueue {
constructor({ name, parallelism, parent, processor, getKey }) {
this._root = parent ? parent._root : this;
if (parent) {
if (this._root._children === undefined) {
this._root._children = [this];
} else {
this._root._children.push(this);
}
}
}
}
到这里我么可以得出 _root
是 this.processDependenciesQueue,_root.children
是以下这个数组:
_root.children = [
this.addModuleQueue,
this.factorizeQueue,
this.buildQueue
]
3.3 队列协同
webpack 在构建过程中当 当触发 compiler.hooks.make 时,EntryPlugin 注册的钩子会触发,进而触发 compilation.addEntry 方法,该方法最终会向 compilation.factorizeQueue 中添加 queueItem,从这里开始整个的构建和依赖解析过程正式开始;
factorizeQueue 的 processor 处理完后会传递给其 resultCallback,该 resultCallback 会向 compilation.addModuleQueue 中添加 queueItem。
当 addModuleQueue 处理结束后会向 compilation.buildQueue 中添加 queueItem;
compilation.buildQueue 的 resultCallback 会向 compilation.processDependenciesQueue 中添加 queueItem,processDependenciesQueue 的 resultCallback 优惠将的到的依赖添加到 factorizeQueue 中;
通过上面的整个循环实现对所有模块的递归依赖解析;整个过程如下图所示:
4. AsyncQueue 的实现
AsyncQueue 是 webpack 内部自己实现的工具类,该类型位于 node_modules/webpack/lib/util/AsyncQueue.js 中;
该类型实现了并发异步队列,考虑到篇幅的问题,我们只讨论以下方法:
- 构造函数
- 入队列的方法:add
- 并发消耗队列的方法:_ensureProcessing
- 执行 processor 的方法:_startProcessing
- 维护并发并处理 proceeor 执行结果的的方法:_handleResult 我们以 factorizeQueue 的声明方式为例:
this.factorizeQueue = new AsyncQueue({
name: "factorize",
parent: this.addModuleQueue,
processor: this._factorizeModule.bind(this)
});
4.1 构造函数
构造函数用于创建异步并发队列实例
class AsyncQueue {
constructor({ name, parallelism, parent, processor, getKey }) {
this.hooks = {
beforeAdd: new AsyncSeriesHook(["item"]),
added: new SyncHook(["item"]),
beforeStart: new AsyncSeriesHook(["item"]),
started: new SyncHook(["item"]),
result: new SyncHook(["item", "error", "result"])
};
this._ensureProcessing = this._ensureProcessing.bind(this);
}
}
4.1.1 参数
构造函数接收一个对象作为参数,对象中的属性及其含义如下:
- name: 队列名称
- parallelism:并发数量
- parent:父亲队列
- proceeor:queueItem 的处理函数
- getKey:queueItem 标识 key 的生成函数
4.1.2 逻辑实现
1. 构造函数参数缓存
缓存 name、parallelism、processor、getKey 到实例对象; 其中 parallelism 默认值为 1,getKey 函数兜底为返回参数的空函数;
class AsyncQueue {
constructor({ name, parallelism, parent, processor, getKey }) {
this._name = name;
this._parallelism = parallelism || 1;
this._processor = processor;
this._getKey = getKey || (item => (item));
// ....
}
}
2. 创建 _entries、_queue 对象
- _entries 属性是一个 Map,用于存放所有队列成员信息的 Map 对象,key 是前面 getKey 方法返回的值,value 则是 AsyncQueueEntry 实例对象;
- _queue 则是 ArrayQueue 的实例,是队列的实现
class AsyncQueue {
constructor({ name, parallelism, parent, processor, getKey }) {
// ....
this._entries = new Map();
this._queued = new ArrayQueue();
// ....
}
}
3. 运行状态声明及亲代关系
队列实例有四个控制运行状态的属性:
- activeTasks:当前并发数量;
- _willEnsureProcessing: 该标志置为 true 时表示队列已经在消耗中了;
- _needProcessing:队列是否已经清空,已清空时该属性为 true;
- _stopped: 该队列是否已经停止运作,已停止时不再接收新的队列成员;
亲代关系则在前文中提及过,多个队列同时声明时可以绑定亲代关系,有了亲代关系后,无论整个家族中那一个队列有新成员加入,都会从最远的祖先队列开始依次进行队列的消耗工作(下称:递推执行)。
递推执行保证执行的顺序,该亲代队列的异步队列实例依靠两个属性构成 亲代关系:
- _root: 作为亲代关系的祖先队列,每次递推执行时从此队列开始消耗;未声明 parent 的队列实例将会成为祖先队列;后面队列的 parent 属性均指向该祖先队列;
- _children: 祖先队列的晚辈队列,传递 parent 属性时则认定该队列隶属于某一个有递推执行需要的队列;
class AsyncQueue {
constructor({ name, parallelism, parent, processor, getKey }) {
// 运行状态标识符
this._activeTasks = 0;
this._willEnsureProcessing = false;
this._needProcessing = false;
this._stopped = false;
// 亲代关系
this._children = undefined;
this._root = parent ? parent._root : this;
if (parent) {
if (this._root._children === undefined) {
this._root._children = [this];
} else {
this._root._children.push(this);
}
}
}
}
4. hooks 声明
AsyncQueue 作为 webpack 的工具类,同样实现了若干 hooks 以备方便介入到异步队列的工作流程 中:
- beforeAdd: AsyncSeriesHook(["item"])
- added: SyncHook(["item"])
- beforeStart: AsyncSeriesHook(["item"])
- started: SyncHook(["item"])
- result: SyncHook(["item", "error", "result"])
以上 hook 接收的 item 是加入队列的数据:
class AsyncQueue {
constructor({ name, parallelism, parent, processor, getKey }) {
this.hooks = {
beforeAdd: new AsyncSeriesHook(["item"]),
added: new SyncHook(["item"]),
beforeStart: new AsyncSeriesHook(["item"]),
started: new SyncHook(["item"]),
result: new SyncHook(["item", "error", "result"])
};
}
}
5. 为 _ensurePressing 绑定 this
这一步骤
class AsyncQueue {
constructor({ name, parallelism, parent, processor, getKey }) {
this._ensureProcessing = this._ensureProcessing.bind(this);
}
}
4.2 add 方法
方法作用:该方法的作用是向异步队列中加入队列成员,新加入的成员会被声明队列时传入的 processor 处理。
我们以 webpack 内部处理模块创建的 factorizeQueue 为例,看下 add 方法的调用:
// Compilation.js
Compilation.prototype.factorizeModule = function (options, callback) {
// 调用队列的 add 方法
this.factorizeQueue.add(options, callback);
}
this.factorizeModule(
{
currentProfile,
factory,
dependencies,
factoryResult: true,
originModule,
contextInfo,
context
},
(err, factoryResult) => {
// factoryResult 是 factorizeQueue 的 processor 处理后返回的结果
}
)
4.2.1 参数
- item: 要加入队列中的成员数据
- callback: 受理 item 被队列 processor 处理之后结果的回调函数
4.2.2 实现
该方法内部主要处理将队列数据成员加入到队列中,整体有以下步骤:
1. 处理队列停运情况
上文中提到运行状态有一个标识 \topped 属性,该属性表示队列已经停止运行不再接收新成员; 所以 add 方法第一项工作便是判断 \stopped 属性,若为 true 则抛出异常告知队列已经停止。
add(item, callback) {
if (this._stopped) return callback(new WebpackError("Queue was stopped"));
//...
}
2. 触发 hooks.beforeAdd 钩子
该钩子在新成员 item 被添加到队列之前触发,传入当前成员 item;
add(item, callback) {
// ...
this.hooks.beforeAdd.callAsync(item, err => {})
// ...
}
3. hooks.beforeAdd 回调
- 判断错误,如果 this.hooks.beforeAdd 钩子执行出错则抛出异常并停止添加当前成员;
this.hooks.beforeAdd.callAsync(item, err => {
if (err) {
callback(makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeAdd`));
return;
}
// ...
})
- item 的 AsyncQueueEntry 实例缓存处理
根据创建队列时传入的 getKey 方法生产 item 对应的 key,这个 key 用作缓存 item 对应的 AsyncQueueEntry 实例 (下称:entry 对象)到 this._entries ;
获取 key 之后判断之前是否已经处理过该 item,如果有缓存说明处理过,此时根据该 item 的 entry 实例中记录的 state, 如果已经完成则根据其是否出错调用为结果受理回调传入不同参数;
如果没有命中缓存则为当前成员 item 创建先的 entry 对象即 AsyncQueueEntry 实例对象;
this.hooks.beforeAdd.callAsync(item, err => {
// ...
const key = this._getKey(item);
const entry = this._entries.get(key);
if (entry !== undefined) {
if (entry.state === DONE_STATE) {
if (inHandleResult++ > 3) {
process.nextTick(() => callback(entry.error, entry.result));
} else {
callback(entry.error, entry.result);
}
inHandleResult--;
} else if (entry.callbacks === undefined) {
entry.callbacks = [callback];
} else {
entry.callbacks.push(callback);
}
return;
}
// 没有命中缓存
const newEntry = new AsyncQueueEntry(item, callback);
// ...
})
AsyncQueueEntry 类型 下面我们来看下上面提到的 下面我们来看下上面提到的
class AsyncQueueEntry {
constructor(item, callback) {
}
}
该类型的实例的属性及作用如下:
- item: 加入队列的队列成员,也是后面 processor 要处理的数据
- state: 当前队列的状态,是一个枚举值,一共有三个,分别表示当前成员的在队列中的状态:
- 2.1 QUEUED_STATE = 0,标识刚加入队列待处理;
- 2.2 PROCESSING_STATE = 1,标识该成员正在被 processor 函数处理;
- 2.3 DONE_STATE = 2,标识该成员已经被处理过,结果受理 callback 尚未被调用;
- callback:队列成员的结果受理 callback,被队列 processor 处理的结果会传递给该回调
- callbacks:队列受理回调数组,有多个 callback 时,会在拿到结果后逐个调用
- result:经由队列 processor 的处理返回的结果;
- error:收集到的错误信息;
- 开启队列消耗
this.hooks.beforeAdd.callAsync(item, err => {
// ...
if (this._stopped) {
this.hooks.added.call(item);
this._root._activeTasks++;
process.nextTick(() =>
this._handleResult(newEntry, new WebpackError("Queue was stopped"))
);
} else {
this._entries.set(key, newEntry);
this._queued.enqueue(newEntry);
const root = this._root;
root._needProcessing = true;
if (root._willEnsureProcessing === false) {
root._willEnsureProcessing = true;
setImmediate(root._ensureProcessing);
}
// ....
})
1)如果 _stopped 为 true 则直接触发 hooks.added 钩子然后调用抛出异常;否则,说明队列没有停止,此时设置 _entries 然后将 entry 加入队列中;
2)紧接着将获取 this._root,这里很重要,这是这个异步队列实现的亲代递推执行的一个关键点,如果在创建队列时没有传入 parent 属性,_root 就是自身,而且也不会成为某个队列的孩子队列。
3)得到 _root 后将 root._needProcess 置为 true,标识该队列需要被消耗。
4)判断 root._willEnsureProcess 是否为 false,如果为 false 则将 _root 队列的 _willEnsureProcessing 置为 true。如果 _willEnsureProcessing 为 true 说明正在递推的队列处理已经在运行了,不需要再次触发了。
5)通过 setImmediate() 在下个事件循环调用 root._enusreProcessing 方法开始消耗队列。
- 触发 hooks.added
最后一步出发 hooks.added 钩子,标识新成员已经加入到队列中。
this.hooks.beforeAdd.callAsync(item, err => {
// ...
this.hooks.added.call(item);
})
4.3 _ensureProcessing 方法
该方法是队列并发数量控制器,它负责调度并开启真正消耗队列的方法;
1)while 循环开启并发消耗当前自身的任务,逐个使队列成员出队列(dequeue); 2)维护 this._activeTasks 标识当前正在处理的任务数量累加; 3)设置 当前 entry 的state 为正在处理; 4)调用 this._startProcessing 处理当前队列成员; 5)当前队列处理完成后将 _willEnsurProcessing 置为 false; 6)判断,如果当前队列尚未清空则暂停执行,暂时不要处理当前队列的后台队列(children) 7)当前队列清空后且 children 属性存在的情况下遍历 children 数组,把后代队列(child)取出来,逐个清空队列的逻辑:entry 对象出队列——累加并发数——修改 entry 对象状态——调用处理方法处理队列成员; 8)this._willEnsureProcessing 如果为 false,则 this._needProcess 同样需要置为 false 标识队列已经处理完成;
_ensureProcessing() {
while (this._activeTasks < this._parallelism) {
const entry = this._queued.dequeue();
if (entry === undefined) break;
this._activeTasks++;
entry.state = PROCESSING_STATE;
this._startProcessing(entry);
}
this._willEnsureProcessing = false;
if (this._queued.length > 0) return;
if (this._children !== undefined) {
for (const child of this._children) {
while (this._activeTasks < this._parallelism) {
const entry = child._queued.dequeue();
if (entry === undefined) break;
this._activeTasks++;
entry.state = PROCESSING_STATE;
child._startProcessing(entry);
}
if (child._queued.length > 0) return;
}
}
if (!this._willEnsureProcessing) this._needProcessing = false;
}
4.4 _startProcessing
4.4.1 参数
该方法接收一个参数:entry,队列成员 entry 对象;
4.4.2 实现
该方法调用创建队列时传入的 processor 方法处理队列中的成员数据(entry.item),并且在得到结果后将数据借由 this._handleResult 方法传递给 item 对应的结果受理回调; 1)触发 this.hooks.beforeStart 并传入 entry.item(队列中待处理的数据成员) 2)调用 this._processor 并传入 entry.item 处理,在回掉中接收处理结果,并将结果传给 this._handleResult 方法; 3)触发 this.hooks.startd 钩子,传入 entry.item
_startProcessing(entry) {
this.hooks.beforeStart.callAsync(entry.item, err => {
if (err) {
this._handleResult(
entry,
makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeStart`)
);
return;
}
let inCallback = false;
try {
this._processor(entry.item, (e, r) => {
inCallback = true;
this._handleResult(entry, e, r);
});
} catch (err) {
if (inCallback) throw err;
this._handleResult(entry, err, null);
}
this.hooks.started.call(entry.item);
});
}
4.5 _handleResult 方法
4.5.1 参数
该方法接收三个参数:
- entry: 队列成员 entry 对象;
- err: processor 执行遇到的错误信息;
- result: processor 执行结束传递的结果数据;
4.5.2 实现
- 触发 hooks.result 钩子传入 entry.item 即队列成员数据,表示已经拿到该队列成员的处理结果;
- 在 hooks.result 的回调中:
- 2.1 hooks 遇到的错误,处理错误
- 2.2 暂存 entry 对象上的结果受理回调: callback/callbacks
- 2.3 更新该队列成员的 entry 对象信息:state/callback/callbacks/result/error
- 2.4 并发数量递减,释放并发量;
- 2.5 结合 _willEnsurProcessing 和 _needProcessing 决定是否再次开启递推队列消耗;之所以会有这个考虑是因为当队列中的数量超过并发的时候就会暂停处理。所以当有成员处理完之后会释放一个并发数量,此时应该恢复队列的消耗工作;
- 2.6 判断 inHandleResult的值是否超过3,这个标识是所有队列共享的,当超过3之后就需要将结果处理回调的处理结果放到下一个事件循环的开头;如果不超过3,则直接调用 callback 传入结果和错误信息;
- 2.7 如果 callbacks(多个回调),则遍历逐个调用;
- 2.8 为 inHandleResult 递减
_handleResult(entry, err, result) {
this.hooks.result.callAsync(entry.item, err, result, hookError => {
const error = hookError
? makeWebpackError(hookError, `AsyncQueue(${this._name}).hooks.result`)
: err;
const callback = entry.callback;
const callbacks = entry.callbacks;
entry.state = DONE_STATE;
entry.callback = undefined;
entry.callbacks = undefined;
entry.result = result;
entry.error = error;
const root = this._root;
root._activeTasks--;
if (root._willEnsureProcessing === false && root._needProcessing) {
root._willEnsureProcessing = true;
setImmediate(root._ensureProcessing);
}
if (inHandleResult++ > 3) {
process.nextTick(() => {
callback(error, result);
if (callbacks !== undefined) {
for (const callback of callbacks) {
callback(error, result);
}
}
});
} else {
callback(error, result);
if (callbacks !== undefined) {
for (const callback of callbacks) {
callback(error, result);
}
}
}
inHandleResult--;
});
}
5. 总结
webpack 通过 AsyncQueue 实现异步并发队列,整体核心有以下几点:
- 创建队列实例时通过 parent 构建亲代关系队列;
- 当向任一队列中添加成员时都会触发 _root._ensureProcessing 方法从祖先队列开始并发消耗队列;
- 启用消耗之后,各个队列的 _processor 方法就会处理队列成员数据;
- _processor 处理队列成员数据得到结果后传递给 _handleResult 方法,该方法把结果传给 callback,同时维护并发数量递减及恢复并发消耗队列的工作
转载自:https://juejin.cn/post/7343442927680651300