likes
comments
collection
share

字节常考:请求并发数量限制

作者站长头像
站长
· 阅读数 7

前端异步请求并发控制是指在客户端(通常是在浏览器中)管理和控制同时发出的异步请求的数量的技术。在进行网络请求,如使用 AJAX 与服务器交互时,可能会有多个请求同时发送。如果不加以控制,大量并发的请求可能会导致服务器压力过大,或者在客户端造成性能问题。为了实现并发控制,你可以使用多种策略,其中一些策略包括:

  1. 队列: 创建一个请求队列,并且同时只处理一定数量的请求。一旦当前处理的请求完成,就从队列中取出下一个请求进行处理。
  2. 批处理: 将多个请求合并成一个批量请求,如果 API 支持的话。这减少了请求数量,但可能需要服务器端的支持。
  3. 节流: 在特定时间内限制请求的数量。例如,每秒只允许发出一个请求。
  4. 防抖: 如果有大量连续的请求,在一定的延迟后只执行最后一次请求,这通常用于搜索框等场景。
  5. Promise 控制: 使用 Promise.all 来管理多个请求,或者使用 Promise.race 来处理多个请求中最快返回的结果。

我们先来手动实现一个并发请求控制,你可以创建一个管理请求的队列,并通过设置一个最大并发数来控制同时进行的请求数量。当一个请求完成时,你可以从队列中取出下一个请求并执行它。以下是一个简单的 JavaScript 例子,它展示了如何使用 Promise 来管理并发请求。假设我们有一个 sendRequest 函数,这个函数接收一个 url,并返回一个 Promise。我们的目标是控制这些请求的并发数。

class RequestQueue {
  constructor(maxConcurrent) {
    this.maxConcurrent = maxConcurrent; // 设置最大并发数
    this.currentRunning = 0; // 当前正在运行的请求数
    this.queue = []; // 等待执行的请求队列
  }

  // 将请求封装成一个函数,推入队列,并尝试执行
  enqueue(url) {
    return new Promise((resolve, reject) => {
      const task = () => {
        // 当请求开始时,currentRunning 加 1
        this.currentRunning++;
        sendRequest(url).then(resolve).catch(reject).finally(() => {
          // 请求结束后,currentRunning 减 1,并尝试执行下一个请求
          this.currentRunning--;
          this.dequeue();
        });
      };
      this.queue.push(task);
      this.dequeue(); // 每次添加请求后尝试执行请求
    });
  }

  dequeue() {
    // 如果当前运行的请求小于最大并发数,并且队列中有待执行的请求
    if (this.currentRunning < this.maxConcurrent && this.queue.length) {
      // 从队列中取出一个请求并执行
      const task = this.queue.shift();
      task();
    }
  }
}

// 这个函数是模拟发送请求的,实际中你可能需要替换成真实的请求操作
function sendRequest(url) {
  console.log(`Sending request to ${url}`);
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log(`Response received from ${url}`);
      resolve(`Result from ${url}`);
    }, Math.random() * 2000); // 随机延时以模拟请求处理时间
  });
}

// 使用 RequestQueue
const requestQueue = new RequestQueue(3); // 假设我们限制最大并发数为3

// 模拟批量请求
const urls = ['url1', 'url2', 'url3', 'url4', 'url5', 'url6'];
urls.forEach(url => {
  requestQueue.enqueue(url).then(result => {
    console.log(result);
  });
});

在这个例子中,我们创建了一个 RequestQueue 类来管理请求。这个队列有三个关键的部分:

  1. enqueue 方法:添加请求到队列中,并尝试调用 dequeue 来执行请求。
  2. dequeue 方法:检查当前执行的请求数量是否小于最大并发数,如果是,就从队列中取出请求并执行。
  3. sendRequest 函数:这是一个模拟请求的函数,实际使用时需要替换为真实的 HTTP 请求操作。

通过这种方式,我们可以有效地控制请求的并发数量,即使在很多请求需要同时发出的情况下,也能保持系统的稳定性和响应能力。

p-limit

p-limit 库可以帮助开发人员控制同时执行的异步任务的数量,以避免过多的并发请求对服务器造成压力。下载量非常高:字节常考:请求并发数量限制这是它的基本源码:

// 定义一个函数 pLimit,它接受一个参数 `concurrency` 表示并发限制的数量
const pLimit = (concurrency) => {
  // 检查 `concurrency` 是否为整数或无穷大,并且大于0,否则抛出类型错误
  if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
    throw new TypeError('Expected `concurrency` to be a number from 1 and up');
  }

  // 初始化队列和活跃任务计数
  const queue = [];
  let activeCount = 0;

  // 定义函数 `next`,用来在任务完成后减少活跃任务计数,并从队列中取出下一个任务执行
  const next = () => {
    activeCount--;

    if (queue.length > 0) {
      queue.shift()();
    }
  };

  // 定义函数 `run`,用来运行任务函数 `fn`,并在执行完毕后调用 `next`
  const run = async (fn, resolve, ...args) => {
    activeCount++;

    const result = (async () => fn(...args))();

    resolve(result);

    try {
      await result;
    } catch {}

    next();
  };

  // 定义函数 `enqueue`,用来将任务加入队列
  const enqueue = (fn, resolve, ...args) => {
    queue.push(run.bind(null, fn, resolve, ...args));

    (async () => {
      await Promise.resolve();

      if (activeCount < concurrency && queue.length > 0) {
        queue.shift()();
      }
    })();
  };

  // 定义函数 `generator`,它返回一个新的 Promise,并将任务函数加入队列
  const generator = (fn, ...args) =>
    new Promise((resolve) => {
      enqueue(fn, resolve, ...args);
    });

  // 通过 `Object.defineProperties` 给 `generator` 添加属性
  Object.defineProperties(generator, {
    activeCount: {
      get: () => activeCount // 返回当前活跃任务的数量
    },
    pendingCount: {
      get: () => queue.length // 返回队列中等待的任务数量
    },
    clearQueue: {
      value: () => {
        queue.length = 0; // 清空队列
      }
    }
  });

  // 返回 `generator` 函数
  return generator;
};

// 使用 pLimit 创建一个新的限制器,限制并发数为2
const limit = pLimit(2);

// 定义一个异步函数 `asyncFun`,它模拟异步操作并在延迟后解析
function asyncFun(value, delay) {
  return new Promise((resolve) => {
    console.log('value ' + value);
    setTimeout(() => resolve(value), delay);
  });
}

// 立即执行的异步函数,用来并发执行多个异步任务
(async function () {
  // 使用 `limit` 包装异步任务,确保任务的并发数不超过2
  const arr = [
    limit(() => asyncFun('aaa', 2000)),
    limit(() => asyncFun('bbb', 1000)),
    limit(() => asyncFun('ccc', 1000)),
    limit(() => asyncFun('ccc', 1000)),
    limit(() => asyncFun('ccc', 1000))
  ];

  // 等待所有包装过的异步任务完成
  const result = await Promise.all(arr);
  // 打印所有异步任务的结果
  console.log(result);
})();

这样的话,初始化推入 5 个函数到队列中,然后由于我们限制了 limit 为 2,所以 enqueue 里面的异步立即执行函数会执行两次后,达到队列限制 shift 执行 run 函数,此后每个异步函数执行完后,再判断队列的 length 是否大于 0 后,shift 队列再次执行后续的 run 函数。所以上面代码的执行结果是:先打印:字节常考:请求并发数量限制过了 1s:字节常考:请求并发数量限制再过 1s:字节常考:请求并发数量限制针对上面有两个代码处大家可能有疑惑:

  1. 为什么要 await result;
  2. 为什么要 await Promise.resolve();

在 run 函数中,await result; 的作用是确保异步函数 fn(通过 fn(...args) 执行)的 Promise 完全解决或拒绝之前,不会执行 next() 函数。这是为了保证在当前任务完成之前,不会从队列中取出新的任务来执行。这样可以确保在任何给定时间内,执行的任务数量不会超过设定的并发限制 concurrency。在 enqueue 函数中,await Promise.resolve(); 的作用是将新的任务的启动推迟到下一个事件循环迭代。这样做允许当前正在执行的代码(例如,正在添加任务到队列中的代码)完成执行,然后才开始执行队列中的任务。这种技术称为“微任务排队”,它确保了任务的启动不会立即发生,从而允许同步代码先执行完毕,例如,允许当前的 enqueue 调用完成并将任务添加到队列中。使用 Promise.resolve() 创建一个微任务,是确保 activeCount < concurrency 的检查和可能的任务启动发生在当前执行栈清空之后的一种方式。