likes
comments
collection
share

js实现promise 的并发控制

作者站长头像
站长
· 阅读数 12

背景

公司项目需要写个 nodejs 脚本,批量导入 xml 中的文本,表格和图片

xml 中有大量的文本,表格和图片,需要调后台接口上传数据。

后台接口没有批量操作,调一次接口只能创建一条数据。

后台服务是单机部署的,并发量有限。

脚本中会产生大量的接口调用,瞬时并发量会很大,会触发服务端的流控或者超时。

要解决的问题

  1. 函数接收什么参数?返回什么?

    函数参数:

    1. 数组 array,存放原始数据
    2. 函数 handler,返回 promise, 遍历数组使用
    3. 最大并发数 max

    返回值:promise 数组,每项和函数参数的数组一一对应

  2. promise 能一次性创建完吗?

由于 promise 实例 new 出来后,就已经是 pending 状态了,如果是通过 axios 或者 fetch 创建的 promise,就意味着请求已经发出了。

如果一次性创建完,则无法实现请求的并发控制。

所以,promsie 只能在需要用到的时候,再创建。实现起来,可以通过调用函数来创建 promise

  1. 用什么数据结构保存所有 promise

    数组

  2. 用什么数据结构保存正在进行的 promise?(e.g. 正在请求的接口)

    队列

  3. 当队列满时,如何等待?

    我们要等到当前队列中第一个 promise 结束,获取第一个结束的 promise

    使用:Promise.race(queue); 等到这个 promise 结束,用:await Promise.race(queue);

    注意:Promise.race 函数是基于现有的 promise 数组,返回一个新的 promise;并不会重复创建 promise,意味着不会重复发送请求。

  4. 如何判断是否能开始下一个 promise

    当前队列中,有一个 promise resolve 或者 reject 时,Promise.race 自动就 resolve 或者 reject 了;就会自动执行 await Promise.race 后面的代码

  5. 如何将已完成的 promise 移出队列?

    queue 中使用引用比较的方式,找到该 promise;用 splice(index,1) 来移除

  6. 特殊场景

    max 为 0 时,如何处理?

    -- 直接返回 Promise.allSettled

思路

  1. 用一个大小相同的数组 result 来存放原始数组每项对应的 promise
  2. 用一个队列 queue 来存放正在进行中的 promise
  3. 遍历函数参数的数组,对每项调用传入的函数,生成 promise
  4. 对这个 promise 绑定结束后的操作:从 queue 里面移除。
  5. promise 放到 result 中对应的位置
  6. 如果队列 queue 满了,就等待,直到有 promise 结束

代码

async function semaphore(max, list, handler) {
  if (max === 0) {
    return Promise.allSettled(list.map(handler));
  }
  // 1
  const result = [];
  // 2
  const queue = [];
  for (let i = 0; i < list.length; i++) {
    // 3.
    const promise = handler(list[i], list);
    // 4
    const newPromise = promise.finally(() => {
      const index = queue.indexOf(newPromise);
      return queue.splice(index, 1);
    });
    // 5
    result.push(promise);
    queue.push(promise);
    // 6
    if (queue.length >= max) {
      await Promise.race(queue);
    }
  }
  return Promise.allSettled(result);
}

验证

测试代码

const results = await semaphore(3, array, getRequest);
for (const { status, value } of results) {
  const { result } = await value.json();
  console.log(status, result);
}

效果

js实现promise 的并发控制

js实现promise 的并发控制

开源社区的实现方式

tiny-async-pool

github.com/rxaviers/as…

看了下源码,思路大同小异。

备注

promise 的并发控制,使用场景较少:一般只用在 nodejs 端,浏览器端一般用不上。

浏览器端应该尽量少发请求,避免短时间内发送大量请求;而且 HTTP1.1 下,一个域下最多同时发送 6 个请求,超出的浏览器会自动排队(代码无感知)。