面试官:实现一个并发控制函数
这是一个朋友在一次技术面试中,面试官要求他实现的一个 JavaScript 函数,用于并发执行一组异步任务,同时能够控制最大并发数。
这样的能力是现代 web 开发中必不可少的技能,特别是在处理批量网络请求、文件操作等场景时,能够有效优化资源利用率、减轻服务器压力、保障用户体验。
接下来,掌门人将按照面试场景的要求,分步骤详细阐述如何实现这个并发控制函数。
一、理解需求与设计思路
面试官提出的需求主要包括以下几点:
-
函数接口:定义一个函数,接受三个参数:
tasks
:一个包含异步任务(返回 Promise 的函数)的数组。maxConcurrency
:允许的最大并发任务数。callback
(可选):一个回调函数,用于在所有任务完成(无论成功或失败)时接收结果或错误。
-
功能要求:
- 并发执行
tasks
中的异步任务,但同时运行的任务数量不超过maxConcurrency
。 - 保持任务完成顺序与
tasks
数组中的顺序一致。 - 如果提供
callback
,在所有任务完成(成功或失败)后调用,传递结果数组或第一个遇到的错误。 - 若不提供
callback
,函数应返回一个 Promise,该 Promise 在所有任务成功完成时解析为结果数组,或在任何任务失败时拒绝。
- 并发执行
设计思路
基于上述需求,我们可以设计如下实现步骤:
- 初始化状态:创建结果数组、待处理任务计数器和正在运行任务计数器。
- 定义并发执行器:编写一个内部函数,负责启动任务、处理结果和错误,以及控制并发数量。
- 启动任务:按最大并发数限制启动初始批次的任务。
- 封装返回值:使用 Promise 封装并发执行过程,根据是否提供回调函数,正确处理结果和错误传递。
二、实现并发控制函数
现在,我们按照设计思路,一步步实现 concurrentExecute
函数。
步骤 1:初始化状态
const results = [];
let pending = tasks?.length;
let running = 0;
results
数组用于存储每个任务完成后的结果,按任务在tasks
数组中的索引顺序填充。pending
计数器记录尚未开始或正在进行的任务数量,初始值等于tasks
的长度。running
计数器记录当前正在运行的任务数量,初始值为 0。
步骤 2:定义并发执行器
function execute(taskIndex) {
if (taskIndex >= tasks.length) return;
running++;
tasks[taskIndex]()
.then((result) => {
results[taskIndex] = result;
running--;
if (pending > 0) {
execute(taskIndex + 1);
}
})
.catch((error) => {
// 处理任务失败情况...
});
// 控制并发数量...
}
-
检查
taskIndex
是否超出任务范围,若超出则直接返回。 -
增加
running
计数器,表示开始执行一个任务。 -
调用任务函数并监听其结果:
- 成功时,将结果存入
results
数组,减小running
计数器,并检查是否有剩余任务待执行。若有,递归调用execute
执行下一个任务。 - 失败时,处理错误(例如,通过回调函数报告错误或抛出异常终止 Promise 链)。
- 成功时,将结果存入
在 execute
函数内,还需添加逻辑来控制并发数量:
if (running < maxConcurrency && taskIndex < tasks.length - 1) {
execute(taskIndex + 1);
}
当当前运行任务数小于最大并发数且还有未启动的任务时,立即启动下一个任务。
步骤 3:启动任务
for (let i = 0; i < Math.min(tasks.length, maxConcurrency); i++) {
execute(i);
}
初始化并发执行,根据 maxConcurrency
启动第一批任务。
步骤 4:封装返回值
return new Promise((resolve, reject) => {
// ... 定时检查任务完成情况和错误处理逻辑 ...
});
// 如果提供了回调函数,还要在 Promise 的 then/catch 链中调用回调
if (callback) {
Promise.all(tasks.map((task, index) => task().catch((err) => err)))
.then((resultsOrErrors) => {
// ... 处理结果 ...
callback(null, results);
})
.catch((err) => {
// ... 处理错误 ...
callback(err);
});
} else {
// ... 返回 Promise ...
}
三、完整代码示例
结合上述实现步骤,以下是完整的 concurrentExecute
函数代码:
function concurrentExecute(tasks, maxConcurrency, callback) {
const results = [];
let pending = tasks.length;
let running = 0;
function execute(taskIndex) {
if (taskIndex >= tasks.length) return;
running++;
tasks[taskIndex]()
.then((result) => {
results[taskIndex] = result;
running--;
if (pending > 0) {
execute(taskIndex + 1);
}
})
.catch((error) => {
if (callback) {
callback(error);
} else {
throw error;
}
});
if (running < maxConcurrency && taskIndex < tasks.length - 1) {
execute(taskIndex + 1);
}
}
for (let i = 0; i < Math.min(tasks.length, maxConcurrency); i++) {
execute(i);
}
return new Promise((resolve, reject) => {
const checkCompletion = () => {
if (pending === 0) {
resolve(results);
} else if (running < maxConcurrency) {
execute(running);
}
};
const intervalId = setInterval(checkCompletion, 0);
const done = (error) => {
clearInterval(intervalId);
if (error) {
reject(error);
}
};
if (callback) {
Promise.all(tasks.map((task, index) => task().catch((err) => err)))
.then((resultsOrErrors) => {
results.forEach((result, index) => {
if (typeof resultsOrErrors[index] === 'undefined') {
results[index] = resultsOrErrors[index];
}
});
done();
callback(null, results);
})
.catch((err) => {
done(err);
callback(err);
});
} else {
Promise.all(tasks)
.then((results) => {
done();
resolve(results);
})
.catch((err) => {
done(err);
reject(err);
});
}
});
}
当所有任务完成后,回调函数将接收到结果数组或第一个遇到的错误。
五、总结
通过上述的案例,我们已经成功实现了满足面试官要求的并发控制函数。这个函数能够有效地并发执行异步任务,同时限制最大并发数,保持任务完成顺序,并根据是否提供回调函数正确处理结果和错误。
这个案例不仅考察了面试者对 JavaScript 异步编程、Promise 和并发控制的理解,也能最大程度的体现出面试者在面对实际需求时的分析设计能力和编码实现技巧。
转载自:https://juejin.cn/post/7352342892784402483