还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行
并发的相关知识
在计算机的基础知识里,有两个很重要的概念:并发和并行
并发: 在同一时刻,要准备完成(即待处理)两个及两个以上的任务时,就说有多个事件正在发生, 即并发;
并行: 仍然是在同一时刻,必须同时执行两个及两个以上的任务时, 就说有多个事件正在进行中,即并行。由于并行的前提是有多个并发事件,因此并行编程的前提是支持并发。
两者的区别如下图所示:
了解了什么是并发之后,再来聊聊为什么要限制并发的数量:
这个限制其实是在下游的,同时发送 10 个 请求和同时发送 100 个请求的成本差别并不大,都是“发送-等待”的节奏,但是下游的“供应商”是会受不了的,这时你需要限制并发数。
拿我们最常见的网络请求来说,如果同时发起大量的网络请求,会对所运行的环境(浏览器/服务器)造成很大的压力,甚至会导致服务器崩溃,所以要限制并发数,减轻环境的压力,同一时间的网络请求数量不能超过最大并发数
。
其实浏览器同域名请求的最大并发数是有做了限制的,不同的 HTTP 版本以及浏览器类型,最大并发数是不一样的,更多细节可以参考这篇文章:浏览器同域名请求的最大并发数限制
限制并发数的思路
思路其实很简单,假设现在有 5 个任务,最大并发数为 3,首先所有的任务依次进入队列,等待执行;我们有一个任务池,最多只能容纳 3个任务的执行,刚开始任务池为空。
按以下步骤进行:
- 1、取前三个任务放进任务池中执行
- 2、等待任务池中任务执行完毕
- 3、移除任务池中最先完成的任务,取队列中的任务放入任务池中执行
- 4、重复第 2 步,直到所有任务执行完毕
整个过程如下图所示:
Promise.all能否控制并发数
在实现批量请求的时候,经常会使用到 Promise.all,这个时候我们可能会产生疑惑:Promise.all 能否控制并发数?
Promise.all
本身并不提供直接的并发控制功能,并不限制同时进行的操作数量,它会立即启动所有传入的 Promise。
我们通过个例子来看一下:
使用 Promise.all 处理十个异步任务,结果如下,几乎是同时打印,也就是我们传入的所有异步任务,可以认为它会同时去执行:
因此 Promise.all 是没法限制并发数的,需要我们手动写代码去控制。
深入剖析 p-limit
在平时的学习中大家可以自行研究并实现大量并发请求控制,如果觉得自己实现的不够好,可以考虑使用一些第三方库,Github 中已经存在挺多优秀的库,我这里给大家推荐 p-limit,下面对 p-limit 进行深入的分析。
p-limit 的使用非常简单,首先要定义并发数量,然后通过 limit 方法传入一个异步任务即可:
import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [
limit(() => fetchFn('1')),
limit(() => fetchFn('2')),
limit(() => fetchFn())
];
await Promise.all(input);
1. 完整源码
p-limit 的源码只有 50 行左右,这里我们不关注 Queue 和 AsyncResource,它的队列结构采用了另一个库 yocto-queue,感兴趣的小伙伴可以去阅读其源码。
import Queue from 'yocto-queue';
import { AsyncResource } from '#async_hooks';
export default function pLimit(concurrency) {
// 并发数的判断
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
// 创建一个队列
const queue = new Queue();
// 当前并发数
let activeCount = 0;
// 执行下一个任务
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
// 执行当前任务
const run = async (function_, resolve, arguments_) => {
activeCount++;
const result = (async () => function_(...arguments_))();
resolve(result);
try {
await result;
} catch {}
next();
};
// 进队操作
const enqueue = (function_, resolve, arguments_) => {
queue.enqueue(
AsyncResource.bind(run.bind(undefined, function_, resolve, arguments_)),
);
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
const generator = (function_, ...arguments_) => new Promise(resolve => {
enqueue(function_, resolve, arguments_);
});
// 在generator身上定义几个辅助函数
Object.defineProperties(generator, {
// 当前执行任务的个数
activeCount: {
get: () => activeCount,
},
// 队列中任务的数量
pendingCount: {
get: () => queue.size,
},
// 清空队列
clearQueue: {
value() {
queue.clear();
},
},
});
return generator;
}
2. 实现细节
看下整个源码的实现思路,一图胜千言~
(1)pLimit: 内部初始化了一个队列,调用 pLimit 后返回一个 generator 函数,每个 generator
函数的执行都会将一个异步函数压入队列中,多次调用 generator 之后便可以将所有需要执行的异步函数均放入队列当中。
(2)enqueue: 用于处理异步函数的入队操作,这里对异步函数进行包装一下,得到一个任务
并入队
大家可能会对 await Promise.resolve()
这行代码比较疑惑,源码中对其进行了解释,按我的理解是:activeCount
是异步更新的,在 run 方法里面更新该值,run 方法是需要在出队的时候才执行,现在是需要在进队完成之后便比较 activeCount 和 concurrency,这里涉及到执行的先后顺序问题,因此这里要等到下一个微任务才进行比较,以获取到最新的 activeCount 值。
(3)run: 出队后执行该方法,内部更新 activeCount 的值,然后执行传入的异步函数,并拿到结果
这里的 function_ 就是外部传入的异步函数,假设该异步函数为上面例子 Promise.all 中的 fetchFun,执行完 fetchFun 后的结果 result 是一个 Promise,所以需要 await result
,从而保证 next 的执行顺序:
3. 模仿实现
学习完 p-limit 的源码之后,你或许会想着按照它的设计思路自己去实现一遍,以加深理解
可以考虑写一个类 PLimit
| TS
写法 来实现 p-limit 的功能,队列的实现就不用再引入一个第三方库了,其实数组便可以满足我们的需求,再把一些无关紧要的判断/功能去掉的话,就是下面这个样子了:
// p-limit.ts
type Fn<T> = (...args: any[]) => Promise<T>;
type Reslove<T> = (value: T | PromiseLike<T>) => void;
class PLimit {
private concurrency: number;
private activeCount: number;
private queue: Array<() => void>;
constructor(concurrency: number) {
this.concurrency = concurrency;
this.activeCount = 0;
this.queue = [];
}
createTask<T>(fn: (...args: any[]) => Promise<T>, ...args: any): Promise<T> {
return new Promise<T>(resolve => {
this.enqueue(fn, resolve, args);
});
}
enqueue<T>(fn: Fn<T>, resolve: Reslove<T>, args: any) {
this.queue.push(this.run.bind(this, fn, resolve, args));
(async () => {
await Promise.resolve();
if (this.activeCount < this.concurrency && this.queue.length > 0) {
const task = this.queue.shift();
task && task();
}
})();
}
next() {
this.activeCount--;
if (this.queue.length > 0) {
const task = this.queue.shift();
task && task();
}
}
async run<T>(fn: Fn<T>, resolve: Reslove<T>, args: any) {
this.activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
this.next();
}
}
整体的结构是和 p-limit 源码差不多的,只是改造成 class
的形式,再加上 TS
,里面的 createTask
就相当于 p-limit 中的 generator 方法。
写在最后
如果有讲得不好的地方,大家可以批评指正,一起交流进步🥳🥳
期待大家的点赞 + 关注,多多支持,你的支持是我前进的动力!🤗🤗
转载自:https://juejin.cn/post/7376575006534860837