likes
comments
collection
share

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

作者站长头像
站长
· 阅读数 19

并发的相关知识

在计算机的基础知识里,有两个很重要的概念:并发和并行

并发: 在同一时刻,要准备完成(即待处理)两个及两个以上的任务时,就说有多个事件正在发生, 即并发;

并行: 仍然是在同一时刻,必须同时执行两个及两个以上的任务时, 就说有多个事件正在进行中,即并行。由于并行的前提是有多个并发事件,因此并行编程的前提是支持并发。

两者的区别如下图所示:

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

了解了什么是并发之后,再来聊聊为什么要限制并发的数量:

这个限制其实是在下游的,同时发送 10 个 请求和同时发送 100 个请求的成本差别并不大,都是“发送-等待”的节奏,但是下游的“供应商”是会受不了的,这时你需要限制并发数。

拿我们最常见的网络请求来说,如果同时发起大量的网络请求,会对所运行的环境(浏览器/服务器)造成很大的压力,甚至会导致服务器崩溃,所以要限制并发数,减轻环境的压力,同一时间的网络请求数量不能超过最大并发数

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

其实浏览器同域名请求的最大并发数是有做了限制的,不同的 HTTP 版本以及浏览器类型,最大并发数是不一样的,更多细节可以参考这篇文章:浏览器同域名请求的最大并发数限制

限制并发数的思路

思路其实很简单,假设现在有 5 个任务,最大并发数为 3,首先所有的任务依次进入队列,等待执行;我们有一个任务池,最多只能容纳 3个任务的执行,刚开始任务池为空。

按以下步骤进行:

  • 1、取前三个任务放进任务池中执行
  • 2、等待任务池中任务执行完毕
  • 3、移除任务池中最先完成的任务,取队列中的任务放入任务池中执行
  • 4、重复第 2 步,直到所有任务执行完毕

整个过程如下图所示:

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

Promise.all能否控制并发数

在实现批量请求的时候,经常会使用到 Promise.all,这个时候我们可能会产生疑惑:Promise.all 能否控制并发数?

Promise.all 本身并不提供直接的并发控制功能,并不限制同时进行的操作数量,它会立即启动所有传入的 Promise。

我们通过个例子来看一下:

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

使用 Promise.all 处理十个异步任务,结果如下,几乎是同时打印,也就是我们传入的所有异步任务,可以认为它会同时去执行:

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

因此 Promise.all 是没法限制并发数的,需要我们手动写代码去控制。

深入剖析 p-limit

在平时的学习中大家可以自行研究并实现大量并发请求控制,如果觉得自己实现的不够好,可以考虑使用一些第三方库,Github 中已经存在挺多优秀的库,我这里给大家推荐 p-limit,下面对 p-limit 进行深入的分析。

p-limit 的使用非常简单,首先要定义并发数量,然后通过 limit 方法传入一个异步任务即可:

import pLimit from 'p-limit';

const limit = pLimit(1);

const input = [
	limit(() => fetchFn('1')),
	limit(() => fetchFn('2')),
	limit(() => fetchFn())
];

await Promise.all(input);

1. 完整源码

p-limit 的源码只有 50 行左右,这里我们不关注 Queue 和 AsyncResource,它的队列结构采用了另一个库 yocto-queue,感兴趣的小伙伴可以去阅读其源码。

import Queue from 'yocto-queue';
import { AsyncResource } from '#async_hooks';

export default function pLimit(concurrency) {
    // 并发数的判断
	if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
		throw new TypeError('Expected `concurrency` to be a number from 1 and up');
	}
        
    // 创建一个队列
	const queue = new Queue();
    // 当前并发数
	let activeCount = 0;
    // 执行下一个任务
	const next = () => {
		activeCount--;
		if (queue.size > 0) {
			queue.dequeue()();
		}
	};

    // 执行当前任务
	const run = async (function_, resolve, arguments_) => {
		activeCount++;
		const result = (async () => function_(...arguments_))();
		resolve(result);
		try {
			await result;
		} catch {}
                
		next();
	};

  // 进队操作
	const enqueue = (function_, resolve, arguments_) => {
		queue.enqueue(
			AsyncResource.bind(run.bind(undefined, function_, resolve, arguments_)),
		);
		(async () => {
			await Promise.resolve();
			if (activeCount < concurrency && queue.size > 0) {
				queue.dequeue()();
			}
		})();
	};

	const generator = (function_, ...arguments_) => new Promise(resolve => {
		enqueue(function_, resolve, arguments_);
	});

   // 在generator身上定义几个辅助函数
	Object.defineProperties(generator, {
           // 当前执行任务的个数
		activeCount: {
			get: () => activeCount,
		},
           // 队列中任务的数量
		pendingCount: {
			get: () => queue.size,
		},
           // 清空队列
		clearQueue: {
			value() {
				queue.clear();
			},
		},
	});

	return generator;
}

2. 实现细节

看下整个源码的实现思路,一图胜千言~

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

(1)pLimit: 内部初始化了一个队列,调用 pLimit 后返回一个 generator 函数,每个 generator 函数的执行都会将一个异步函数压入队列中,多次调用 generator 之后便可以将所有需要执行的异步函数均放入队列当中。

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

(2)enqueue: 用于处理异步函数的入队操作,这里对异步函数进行包装一下,得到一个任务 并入队

大家可能会对 await Promise.resolve() 这行代码比较疑惑,源码中对其进行了解释,按我的理解是:activeCount是异步更新的,在 run 方法里面更新该值,run 方法是需要在出队的时候才执行,现在是需要在进队完成之后便比较 activeCount 和 concurrency,这里涉及到执行的先后顺序问题,因此这里要等到下一个微任务才进行比较,以获取到最新的 activeCount 值。

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

(3)run: 出队后执行该方法,内部更新 activeCount 的值,然后执行传入的异步函数,并拿到结果

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

这里的 function_ 就是外部传入的异步函数,假设该异步函数为上面例子 Promise.all 中的 fetchFun,执行完 fetchFun 后的结果 result 是一个 Promise,所以需要 await result,从而保证 next 的执行顺序:

还不知道如何限制并发数?那就试试 p-limit 吧并发的相关知识 在计算机的基础知识里,有两个很重要的概念:并发和并行

3. 模仿实现

学习完 p-limit 的源码之后,你或许会想着按照它的设计思路自己去实现一遍,以加深理解

可以考虑写一个类 PLimit | TS 写法 来实现 p-limit 的功能,队列的实现就不用再引入一个第三方库了,其实数组便可以满足我们的需求,再把一些无关紧要的判断/功能去掉的话,就是下面这个样子了:

// p-limit.ts

type Fn<T> = (...args: any[]) => Promise<T>;
type Reslove<T> = (value: T | PromiseLike<T>) => void;

class PLimit {
  private concurrency: number;
  private activeCount: number;
  private queue: Array<() => void>;

  constructor(concurrency: number) {
    this.concurrency = concurrency;
    this.activeCount = 0;
    this.queue = [];
  }

  createTask<T>(fn: (...args: any[]) => Promise<T>, ...args: any): Promise<T> {
    return new Promise<T>(resolve => {
      this.enqueue(fn, resolve, args);
    });
  }

  enqueue<T>(fn: Fn<T>, resolve: Reslove<T>, args: any) {
    this.queue.push(this.run.bind(this, fn, resolve, args));
    (async () => {
      await Promise.resolve();
      if (this.activeCount < this.concurrency && this.queue.length > 0) {
        const task = this.queue.shift();
        task && task();
      }
    })();
  }

  next() {
    this.activeCount--;
    if (this.queue.length > 0) {
      const task = this.queue.shift();
      task && task();
    }
  }

  async run<T>(fn: Fn<T>, resolve: Reslove<T>, args: any) {
    this.activeCount++;
    const result = (async () => fn(...args))();
    resolve(result);
    try {
      await result;
    } catch {}
    this.next();
  }
}

整体的结构是和 p-limit 源码差不多的,只是改造成 class 的形式,再加上 TS,里面的 createTask 就相当于 p-limit 中的 generator 方法。

写在最后

如果有讲得不好的地方,大家可以批评指正,一起交流进步🥳🥳

期待大家的点赞 + 关注,多多支持,你的支持是我前进的动力!🤗🤗

转载自:https://juejin.cn/post/7376575006534860837
评论
请登录