来看看大神如何对多个异步操作并发控制
- 本文参加了由公众号@若川视野 发起的每周源码共读活动, 点击了解详情一起参与。
- 这是源码共读的第31期,链接:juejin.cn/post/708759…
前言
日常业务中,我们经常会遇到多个请求同时并发的场景,比如页面的初始化请求。我们使用Promise.all
等API就可以轻松实现。
但这些文章都是仅仅围绕请求并发控制,如果再想深一层,可不可以针对所有异步操作的并发控制呢?比如异步请求、延时、文件操作(nodejs)这些异步操作。答案是有的,针对这样的场景,sindresorhus大神早已写了一个库p-limit。
那具体如何实现呢?我们一起来看一下。
基本用法
在分析源码前,我们来看看这个库的用法,这可以让我们分析源码更加容易理解。
import pLimit from 'p-limit';
// 1.生成限流函数(限流为1)
const limit = pLimit(1);
// 2.将需要的异步操作通过箭头函数包裹一层,让限流函数接管执行
const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];
// 3.限流函数返回的也是promise,我们同样可以通过Promise.all接受结果;
// 而且同一时间只会执行一个请求(异步操作)
const result = await Promise.all(input);
console.log(result);
使用起来还是很简洁的,只是多了生成限流函数和让限流函数接管执行两个步骤,就可以做到并发控制。
源码分析
知道了用法,我们直接来看看p-limit主要源码,第一步先来看主体结构:
import Queue from 'yocto-queue';
export default function pLimit (concurrency) {
// 检测是否传入并发限制参数
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
// 两个重要的变量
const queue = new Queue(); // 待执行的队列
let activeCount = 0; // 正在执行的数量
// ...省略了部分代码
// 限流函数,返回的是Promise
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
// ...省略了部分代码
// 返回限流函数generator
return generator;
}
解释:
pLimit
本质就是一个函数,利用闭包存储了三个状态concurrency
、queue
和activeCount
,返回一个新的函数,这个新函数就是我们用法当中说的限流函数。- 限流函数generator,函数执行时,会实例化一个
Promise
实例并返回。实例过程调用了enqueue
(下面细说这个函数),并且将限流函数传入的fn
及其参数args
和实例的resolve
函数作为参数传入。有了这些参数其实就相当于限流函数的内部就可以接管异步操作fn的执行了。
yocto-queue是一个通过链表实现队列的库,性能很好,时间复杂度入列和出列都是O(1),下面会用到:
- enqueue:入列
- dequeue:出列
enqueue
接着我们深入enqueue
做了什么:
const enqueue = (fn, resolve, args) => {
// 1.入列:将函数fn的执行让run函数接管,并通过bind函数包裹一层,放入到queue队列中
// run函数下一小节介绍,只需要知道它接管了fn函数执行
// bind的作用就是让函数run先不要执行,只是入列,在出列后再执行
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
// 2.延时:这个延时是通过Promise来实现微任务延时,让下面的出列操作单次循环的微任务之后,
// 作用就是让你可以一次性多次执行限流函数入列,会等待你全部同步入列后才会执行下面的出列操作
await Promise.resolve();
// 3.出列:判断正在执行的数量没有超出限制就出列run函数并执行
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
run
const run = async (fn, resolve, args) => {
// 标记正在执行的数量加1
activeCount++;
// 执行fn
// 利用async包裹执行,让函数报错了只会报Uncaught (in promise) Error,但不会影响其他代码执行
const result = (async () => fn(...args))();
// 将限流函数返回的promise状态改为已完成且结果是fn的执行结果
// 这里就是使用时,limit包裹一层依然能得到相同的promise结果的关键
resolve(result);
// 捕捉上面所说的可能发生会报的Uncaught (in promise) Error,让执行彻底不会抛出异常
// await让result执行完后,再执行next函数
try {
await result;
} catch {}
// 下一小节介绍
next();
};
next
const next = () => {
// 标记正在执行的数量减1
activeCount--;
// 只要队列还有需要执行的函数,就出列run函数执行
if (queue.size > 0) {
queue.dequeue()();
}
};
设置外部可调用属性和方法
// 通过Object.defineProperties给限流函数设置了在执行的数量、等待执行的数量和清理对队列的方法
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value: () => {
queue.clear();
},
},
});
流程图
解释了一遍源码,可能还不够直观看明白整个流程。我们不妨写一个流程图抽离最基本的操作来看看整个控制的过程:
假设我们通过pLimit(1)
生成了一个限流函数limit
:

总结
p-limit
异步操作的并发控制原理就是通过队列控制多个异步操作并发,入列的时机是在限流函数的执行,出列的时机则有两种可能,优先限流函数的入列后就执行,但当达到限流条件时就会在每次执行异步操作后也会释放一次出列的机会。
回到代码层面,p-limit
虽然实现代码只有68行,但内部有一些技巧很值得我们学习:
promise
的resolve
传入内部和闭包的结合来接管外部函数的执行- 闭包设置内部私有内部属性,
Object.defineProperties
设置外部可访问属性 - 前置一个微任务实现异步操作的并发控制
参考
转载自:https://juejin.cn/post/7129339065828114462