likes
comments
collection
share

单线程 JavaScript 如何实现并发数的控制?

作者站长头像
站长
· 阅读数 17

并发的概念

在介绍并发相关概念之前,说明一下什么是串行。

串行指的是任务是一个接着一个执行的,下一个任务需要等待上一个任务执行完毕才能开始执行。就拿我们日常开发中请求接口的场景来举例子:

// sleep 函数模拟接口的响应时间
const sleep = (time) =>
  new Promise((resolve, _) => {
    setTimeout(resolve, time);
  });

const api1 = async () => {
  await sleep(1000);
  console.log("发送 /api/task1 接口");
  return "data1";
};

const api2 = async () => {
  await sleep(1000);
  console.log("发送 /api/task2 接口");
  return "data2";
};
const api3 = async () => {
  await sleep(1000);
  console.log("发送 /api/task3 接口");
  return "data3";
};

(async function () {
  const res1 = await api1();
  console.log("返回:", res1);  // data1
  const res2 = await api2();
  console.log("返回:", res2);  // data2
  const res3 = await api3();
  console.log("返回:", res3);  // data3
})();

执行结果如下:

单线程 JavaScript 如何实现并发数的控制?

这是非常典型的串行执行方式,先请求第一个接口,过了一秒,成功返回数据后,再去请求第二个接口,再过一秒,成功返回数据后,再去请求第三个接口。

串行带来的问题是,当上一个任务执行时间很长时,下一个任务就需要一直等待它执行完成。

那能不能让多个任务或者多个接口请求同时发生呢?能!这就是并发所要做的事情。

并发指的是,多个任务同时发生,并且请求的结果依然是按照同步的顺序返回。我们将上述的立即执行函数进行改写:

// ...省略以上代码

(async function () {
  console.time("所有接口已请求完成,所需时间");
  Promise.all([api1(), api2(), api3()]).then((results) => {
    console.log(results);
    console.timeLog("所有接口已请求完成,所需时间");
  });
})();

使用 console.timeconsole.timeLog 对并发任务的执行时间进行记录,执行结果如下:

单线程 JavaScript 如何实现并发数的控制?

可以看到,在一秒的时候,同时发送了三个接口,并且按照发送接口顺序的返回了数据,执行时间变为一秒而不是三秒。

单线程的 JavaScript 能做到并发编程,完全要归功于它的异步执行机制,异步起初并不是为了并发,而是为了不阻塞主线程,就比如我们使用 addEventListener 对元素进行事件监听一样,JavaScript 主线程不用等到事件触发了,才去执行下面的代码。所以我们可以同时发起多个异步操作,来达到并发的效果,虽然最后的返回数据过程是同步的。

并发数的控制

在同时发送 10 个请求和同时发送 1000 个请求的比较下,前端获取并处理 10 个数据和 1000 个数据的性能消耗是差不多的,但对于服务器来说,可能承受不了这 1000 个请求的压力,所以并发数的控制显得尤为重要。

npm 包中,已经有较为成熟的解决方案 — p-limit ,使用方法如下:

import pLimit from 'p-limit';

const limit = pLimit(1);

const input = [
	limit(() => fetchSomething('foo')),
	limit(() => fetchSomething('bar')),
	limit(() => doSomething())
];

// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);

首先向 pLimit 函数传入限制并发数,返回结果是一个添加并发任务的函数 — limit,并发任务作为参数传给 limit 函数,最后仍然是通过 Promise.all() 方法获取返回结果。

我们使用 p-limit 对之前的代码进行改造:

import pLimit from 'p-limit';

// ...省略代码

const limit = pLimit(2);

const inputs = [limit(() => api1()), limit(() => api2()), limit(() => api3())];

(async function () {
    console.time("所有接口已请求完成,所需时间");
    const res = await Promise.all(inputs);
    console.timeLog("所有接口已请求完成,所需时间");
    console.log(res);
})();

把并发数的限制设置为 2。执行结果如下:

单线程 JavaScript 如何实现并发数的控制?

一秒后,输出第一个和第二个接口,再过一秒,才输出第三个接口,总的执行时间是两秒。完美地实现了并发数的控制。

为了更好地了解如何实现并发数的控制,接下来,会对 p-limit 的源码进行深入的解析。

详解 p-limit 的源码

p-limit 内部使用了队列,而队列的实现依赖于 yocto-queue 包。

为什么使用的数据结构是队列而不是栈?因为队列是先进先出的特点,保持了并发任务加入 limit 函数的顺序,而栈是后进先出的特点,跟并发任务加入 limit 函数的顺序是相反的。

我们先来看看调用 pLimit 后返回一个添加并发任务的函数的逻辑:

import Queue from 'yocto-queue';

export default function pLimit(concurrency) {
    // ...省略代码
    
    const queue = new Queue(); // 用来存放并发任务的队列
    
    const run = async (fn, resolve, args) => {
        // ...省略代码

        const result = (async () => fn(...args))();

        // ...省略代码
    };

    const enqueue = (fn, resolve, args) => {
        // 函数入队
        queue.enqueue(run.bind(undefined, fn, resolve, args));
 
        // ...省略代码
    };

    const generator = (fn, ...args) => new Promise(resolve => {
        enqueue(fn, resolve, args);
    });

    // ...省略代码

    return generator;
}
  1. 返回的是 generator 函数,该函数的显式参数 fn 是并发任务,返回值是 Promise 对象,所以可以被 Promise.all() 方法接收。
  2. 每次调用 generator 函数,都会将并发任务加入到常量 queue 队列当中,而不是立即执行并发任务。

接下来要思考的问题是,如果有三个并发任务,并发限制数为 2,怎么在最开始的时候,只执行两个并发任务呢?主要通过一个变量来控制。

import Queue from 'yocto-queue';

export default function pLimit(concurrency) {
    // ...省略代码
    
    const queue = new Queue(); // 用来存放并发任务的队列
    let activeCount = 0;
    
    // ...省略代码
    
    const run = async (fn, resolve, args) => {
        activeCount++;

        const result = (async () => fn(...args))();
    
        // ...省略代码
    };

    const enqueue = (fn, resolve, args) => {
        // 函数入队
        queue.enqueue(run.bind(undefined, fn, resolve, args));
        // 函数出队,执行并发任务
        (async () => {
            await Promise.resolve();

            if (activeCount < concurrency && queue.size > 0) {
                queue.dequeue()();
            }
        })();
    };

    const generator = (fn, ...args) => new Promise(resolve => {
        enqueue(fn, resolve, args);
    });

    // ...省略代码

    return generator;
}

变量 activeCount 表示当前正在执行的并发任务数量,主要用于和限制并发数量 concurrency 进行比较,来控制同时发生的并行任务的个数。

控制开始执行并发任务的数量,关键在于 enqueue() 函数里面的 async 立即执行函数,如果当前正在执行的并发任务数量小于限制并发数量,并且队列中还有并发任务的情况下,才会去执行并发任务。所以当第三个并发任务进行判断时,activeCount 值为 2,等于限制并发数量,不会在一开始的时候就执行。

至于为什么 async 立即执行函数里需要用到 await Promise.resolve() 语句,也就是为什么在最开始的时候需要异步执行并发任务,源代码的解释是 activeCount 是异步更新的,因此 activeCountconcurrency 的比较之前需要在异步中执行,才能获取最新的 activeCount 值。

我并没有理解这句话的意思,因为我尝试着将 await Promise.resolve() 这行代码注释之后,async 立即执行函数就改为了同步执行,并发限制功能依然存在,逻辑上也能通顺,并且最后用 Promise.all() 方法返回的结果顺序也是正确的。

所以我认为这里异步执行的原因应该是属于性能上的提升,假如 async 立即执行函数是同步的,并且其中一个并发任务是同步函数,没有任何异步操作,该函数体代码的时间复杂度较高,比如是 O(n^2),那么这个并发任务就有可能阻塞主线程很长一段时间。所以如果改为异步执行,那么就不会阻塞主线程的代码执行。

好了,最后要考虑的两个问题是:

  1. 当前并发任务执行结束后,如何调度下一个并发任务的执行?
  2. 如何将并发任务的执行结果按顺序逐一返回给 Promise.all() 方法?
import Queue from 'yocto-queue';

export default function pLimit(concurrency) {
    // ...省略代码
    
    const queue = new Queue(); // 用来存放并发任务的队列
    let activeCount = 0;
    
    const next = () => {
        activeCount--;

        if (queue.size > 0) {
            queue.dequeue()();
        }
    };
    
    const run = async (fn, resolve, args) => {
        activeCount++;

        const result = (async () => fn(...args))();
    
        resolve(result);

        try {
            await result;
        } catch {}

        next();
    };

    const enqueue = (fn, resolve, args) => {
        // 函数入队
        queue.enqueue(run.bind(undefined, fn, resolve, args));
        // 函数出队,执行并发任务
        (async () => {
            await Promise.resolve();

            if (activeCount < concurrency && queue.size > 0) {
                queue.dequeue()();
            }
        })();
    };

    const generator = (fn, ...args) => new Promise(resolve => {
        enqueue(fn, resolve, args);
    });

    // ...省略代码

    return generator;
}
  1. 通过 await result 等待当前并发任务执行结束后,调用 next() 函数来调度下一个并发任务的执行。
  2. 通过 resolve(result) 将每个并发任务的执行结果返回给 Promise.all() 方法。

至此,p-limit 的核心代码已经分析完了,相信朋友们已经对并发数的控制有更深的理解啦!

总结

  1. 串行指的是任务是一个接着一个执行的,下一个任务需要等待上一个任务执行完毕才能开始执行。并发指的是多个任务同时发生,计算过程依然是同步执行。
  2. p-limit 中主要通过队列、异步和变量 activeCount 来实现限制并发数。
转载自:https://juejin.cn/post/7232838211029794873
评论
请登录