【源码共读】大并发量如何控制并发数
在常见的网络请求的过程中,我们都会遇到并发的情况,如果一次性发起过多的请求,会导致服务器压力过大,甚至会导致服务器崩溃,所以我们需要控制并发的数量,这样才能保证服务器的正常运行。
今天带来的就是并发控制的库:p-limit
使用
根据README
的介绍,我们可以通过p-limit
来创建一个限制并发的函数,然后通过这个函数来执行我们的异步任务。
import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];
// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);
源码分析
我们先来看一下p-limit
的源码:
import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const queue = new Queue();
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, args) => {
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value: () => {
queue.clear();
},
},
});
return generator;
}
加上注释和换行只有68行
代码,非常简单,我们来一行一行的分析:
可以看到最开始就导入了yocto-queue
这个库,这个库之前有分析过:【源码共读】yocto-queue 一个微型队列数据结构
这个库就是一个队列的数据结构,不懂的可以直接将这个理解为数组就好;
跟着使用的代码来看,最开始就是通过pLimit
来创建一个限制并发的函数,这个函数接收一个参数concurrency
,然后返回一个函数,来看看这一步的代码:
function pLimit(concurrency) {
if (
!((Number.isInteger(concurrency)
|| concurrency === Number.POSITIVE_INFINITY)
&& concurrency > 0)
) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
return generator;
}
首先这个函数接收一个参数concurrency
,然后判断这个参数是否是一个大于0的整数,如果不是就抛出一个错误;
返回的函数很简单,就是接收一个函数fn
和参数args
,然后返回一个Promise
;
然后调用返回的generator
函数就会执行enqueue
函数,对应的代码如下:
const enqueue = (fn, resolve, args) => {
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
这个函数接收三个参数,fn
、resolve
、args
,然后将run
函数放入队列中;
这里run
使用bind
是因为并不需要立即执行,参考:function.prototype.bind()
然后立即执行一个异步函数,这个里面首先会等待下一个微任务,注释解释了这个原因,因为activeCount
是异步更新的,所以需要等待下一个微任务才能获取到最新的值;
然后判断activeCount
是否小于concurrency
,并且队列中有任务,如果满足条件就会将队列中的任务取出来执行,这一步就是并发的核心了;
这里的
queue.dequeue()()
执行的是run
函数,这里容易理解错误,所以框起来。
接下来看看run
函数:
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {
}
next();
};
run
函数就是用来执行异步并发任务的;
首先activeCount
加1,表示当前并发数加1;
然后执行fn
函数,这个函数就是我们传入的异步函数,然后将结果赋值给result
,注意现在的result
是一个处在pending
状态的Promise
;
然后将result
传入resolve
函数,这个resolve
函数就是enqueue
函数中返回的Promise
的resolve
函数;
然后等待result
的状态发生改变,这里使用了try...catch
,因为result
可能会出现异常,所以需要捕获异常;
最后执行next
函数,这个函数就是用来处理并发数的,对应的代码如下:
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
首先activeCount
减1,表示当前并发数减1;
然后判断队列中是否还有任务,如果有就取出来执行;
queue.dequeue()
可以理解为[].shift()
,取出队列中的第一个任务,由于确定里面是一个函数,所以直接执行就可以了;
最后面还看到了使用Object.defineProperties
为generator
函数添加了几个属性,来看看:
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value: () => {
queue.clear();
},
},
});
activeCount
:当前并发数pendingCount
:队列中的任务数clearQueue
:清空队列
这些属性都是只读的,可以让我们在外部知道当前的并发数和队列中的任务数,并且手动清空队列;
动手实现
接下来我们来动手实现一个,抛开队列直接使用数组
+ class
实现一个简易版:
class PLimit {
constructor(concurrency) {
this.concurrency = concurrency;
this.activeCount = 0;
this.queue = [];
return (fn, ...args) => {
return new Promise(resolve => {
this.enqueue(fn, resolve, args);
});
}
}
enqueue(fn, resolve, args) {
this.queue.push(this.run.bind(this, fn, resolve, args));
(async () => {
await Promise.resolve();
if (this.activeCount < this.concurrency && this.queue.length > 0) {
this.queue.shift()();
}
})();
}
async run(fn, resolve, args) {
this.activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {
}
this.next();
}
next() {
this.activeCount--;
if (this.queue.length > 0) {
this.queue.shift()();
}
}
}
一共十个并发的任务,每个任务花费 2秒,控制并发数为 2 时,一共花费 10秒。
总结
这篇文章主要介绍了Promise
的并发控制,主要是通过队列来实现的。
并发控制的核心就是控制并发数,所以我们需要一个队列来存储任务,然后控制并发数,当并发数小于最大并发数时,就从队列中取出任务执行,这样就可以控制并发数了。
等待上一个任务的执行通过await
来实现,这样就可以保证每次都只有可控的并发数在执行。
代码量并不多,但是内部的操作还有细节处理都是知识点。
转载自:https://juejin.cn/post/7179220832575717435