单线程 JavaScript 如何实现并发数的控制?
并发的概念
在介绍并发相关概念之前,说明一下什么是串行。
串行指的是任务是一个接着一个执行的,下一个任务需要等待上一个任务执行完毕才能开始执行。就拿我们日常开发中请求接口的场景来举例子:
// sleep 函数模拟接口的响应时间
const sleep = (time) =>
new Promise((resolve, _) => {
setTimeout(resolve, time);
});
const api1 = async () => {
await sleep(1000);
console.log("发送 /api/task1 接口");
return "data1";
};
const api2 = async () => {
await sleep(1000);
console.log("发送 /api/task2 接口");
return "data2";
};
const api3 = async () => {
await sleep(1000);
console.log("发送 /api/task3 接口");
return "data3";
};
(async function () {
const res1 = await api1();
console.log("返回:", res1); // data1
const res2 = await api2();
console.log("返回:", res2); // data2
const res3 = await api3();
console.log("返回:", res3); // data3
})();
执行结果如下:

这是非常典型的串行执行方式,先请求第一个接口,过了一秒,成功返回数据后,再去请求第二个接口,再过一秒,成功返回数据后,再去请求第三个接口。
串行带来的问题是,当上一个任务执行时间很长时,下一个任务就需要一直等待它执行完成。
那能不能让多个任务或者多个接口请求同时发生呢?能!这就是并发所要做的事情。
并发指的是,多个任务同时发生,并且请求的结果依然是按照同步的顺序返回。我们将上述的立即执行函数进行改写:
// ...省略以上代码
(async function () {
console.time("所有接口已请求完成,所需时间");
Promise.all([api1(), api2(), api3()]).then((results) => {
console.log(results);
console.timeLog("所有接口已请求完成,所需时间");
});
})();
使用 console.time
和 console.timeLog
对并发任务的执行时间进行记录,执行结果如下:

可以看到,在一秒的时候,同时发送了三个接口,并且按照发送接口顺序的返回了数据,执行时间变为一秒而不是三秒。
单线程的 JavaScript
能做到并发编程,完全要归功于它的异步执行机制,异步起初并不是为了并发,而是为了不阻塞主线程,就比如我们使用 addEventListener
对元素进行事件监听一样,JavaScript
主线程不用等到事件触发了,才去执行下面的代码。所以我们可以同时发起多个异步操作,来达到并发的效果,虽然最后的返回数据过程是同步的。
并发数的控制
在同时发送 10 个请求和同时发送 1000 个请求的比较下,前端获取并处理 10 个数据和 1000 个数据的性能消耗是差不多的,但对于服务器来说,可能承受不了这 1000 个请求的压力,所以并发数的控制显得尤为重要。
npm
包中,已经有较为成熟的解决方案 — p-limit ,使用方法如下:
import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];
// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);
首先向 pLimit
函数传入限制并发数,返回结果是一个添加并发任务的函数 — limit
,并发任务作为参数传给 limit
函数,最后仍然是通过 Promise.all()
方法获取返回结果。
我们使用 p-limit
对之前的代码进行改造:
import pLimit from 'p-limit';
// ...省略代码
const limit = pLimit(2);
const inputs = [limit(() => api1()), limit(() => api2()), limit(() => api3())];
(async function () {
console.time("所有接口已请求完成,所需时间");
const res = await Promise.all(inputs);
console.timeLog("所有接口已请求完成,所需时间");
console.log(res);
})();
把并发数的限制设置为 2。执行结果如下:

一秒后,输出第一个和第二个接口,再过一秒,才输出第三个接口,总的执行时间是两秒。完美地实现了并发数的控制。
为了更好地了解如何实现并发数的控制,接下来,会对 p-limit
的源码进行深入的解析。
详解 p-limit 的源码
p-limit
内部使用了队列,而队列的实现依赖于 yocto-queue
包。
为什么使用的数据结构是队列而不是栈?因为队列是先进先出的特点,保持了并发任务加入 limit
函数的顺序,而栈是后进先出的特点,跟并发任务加入 limit
函数的顺序是相反的。
我们先来看看调用 pLimit
后返回一个添加并发任务的函数的逻辑:
import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
// ...省略代码
const queue = new Queue(); // 用来存放并发任务的队列
const run = async (fn, resolve, args) => {
// ...省略代码
const result = (async () => fn(...args))();
// ...省略代码
};
const enqueue = (fn, resolve, args) => {
// 函数入队
queue.enqueue(run.bind(undefined, fn, resolve, args));
// ...省略代码
};
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
// ...省略代码
return generator;
}
- 返回的是
generator
函数,该函数的显式参数fn
是并发任务,返回值是Promise
对象,所以可以被Promise.all()
方法接收。 - 每次调用
generator
函数,都会将并发任务加入到常量queue
队列当中,而不是立即执行并发任务。
接下来要思考的问题是,如果有三个并发任务,并发限制数为 2,怎么在最开始的时候,只执行两个并发任务呢?主要通过一个变量来控制。
import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
// ...省略代码
const queue = new Queue(); // 用来存放并发任务的队列
let activeCount = 0;
// ...省略代码
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
// ...省略代码
};
const enqueue = (fn, resolve, args) => {
// 函数入队
queue.enqueue(run.bind(undefined, fn, resolve, args));
// 函数出队,执行并发任务
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
// ...省略代码
return generator;
}
变量 activeCount
表示当前正在执行的并发任务数量,主要用于和限制并发数量 concurrency
进行比较,来控制同时发生的并行任务的个数。
控制开始执行并发任务的数量,关键在于 enqueue()
函数里面的 async
立即执行函数,如果当前正在执行的并发任务数量小于限制并发数量,并且队列中还有并发任务的情况下,才会去执行并发任务。所以当第三个并发任务进行判断时,activeCount
值为 2,等于限制并发数量,不会在一开始的时候就执行。
至于为什么 async
立即执行函数里需要用到 await Promise.resolve()
语句,也就是为什么在最开始的时候需要异步执行并发任务,源代码的解释是 activeCount
是异步更新的,因此 activeCount
和 concurrency
的比较之前需要在异步中执行,才能获取最新的 activeCount
值。
我并没有理解这句话的意思,因为我尝试着将 await Promise.resolve()
这行代码注释之后,async
立即执行函数就改为了同步执行,并发限制功能依然存在,逻辑上也能通顺,并且最后用 Promise.all()
方法返回的结果顺序也是正确的。
所以我认为这里异步执行的原因应该是属于性能上的提升,假如 async
立即执行函数是同步的,并且其中一个并发任务是同步函数,没有任何异步操作,该函数体代码的时间复杂度较高,比如是 O(n^2)
,那么这个并发任务就有可能阻塞主线程很长一段时间。所以如果改为异步执行,那么就不会阻塞主线程的代码执行。
好了,最后要考虑的两个问题是:
- 当前并发任务执行结束后,如何调度下一个并发任务的执行?
- 如何将并发任务的执行结果按顺序逐一返回给
Promise.all()
方法?
import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
// ...省略代码
const queue = new Queue(); // 用来存放并发任务的队列
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, args) => {
// 函数入队
queue.enqueue(run.bind(undefined, fn, resolve, args));
// 函数出队,执行并发任务
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
// ...省略代码
return generator;
}
- 通过
await result
等待当前并发任务执行结束后,调用next()
函数来调度下一个并发任务的执行。 - 通过
resolve(result)
将每个并发任务的执行结果返回给Promise.all()
方法。
至此,p-limit
的核心代码已经分析完了,相信朋友们已经对并发数的控制有更深的理解啦!
总结
- 串行指的是任务是一个接着一个执行的,下一个任务需要等待上一个任务执行完毕才能开始执行。并发指的是多个任务同时发生,计算过程依然是同步执行。
p-limit
中主要通过队列、异步和变量activeCount
来实现限制并发数。
转载自:https://juejin.cn/post/7232838211029794873