likes
comments
collection
share

"排队执行:如何优雅地管理JavaScript异步任务"

作者站长头像
站长
· 阅读数 41

背景

在公司做IM系统,其中有个场景是重新登录时会收到一堆离线的消息(包括普通消息,群聊消息,命令消息),消息是一条一条处理的,而且处理单条消息是一个异步的过程,直接执行的话可能处理某条消息是要依赖其他消息的处理操作,例如先说到某条消息再收到该消息撤回命令,可能收到消息过后本地缓存的动作还没执行完,就执行了撤回的操作,导致该消息等于是先撤回再本地缓存,就有问题啦。 此时就需要一个队列,保证先进先出的原则,一条一条执行消息。

解析

队列是一种数据结构,它遵循先进先出(FIFO)原则 - 添加到队列中的第一个元素是最先被处理或从队列中删除的元素。在软件开发中,队列通常用于管理异步任务,例如网络请求或数据库操作。通过使用队列,我们可以确保这些任务按照它们接收到的顺序依次执行。

但是,在某些情况下,我们可能希望限制同一时间内并发执行任务的数量。这就是有限并发队列非常有用的地方。它允许我们控制任务执行的速率,并通过限制可以并发执行的任务数来防止资源耗尽。

下面我们逐个解析代码,以了解它是如何工作的。

class Queue {
  private _count: number = 0
  private _taskList: Array<Function> = [];
  private _max: number = 1;
  constructor(max: number = 1) {
    this._max = max;
    this._count = 0;
    this._taskList = [];
  }
}

这段代码定义了一个名为 Queue 的类,具有三个私有属性:_count_taskList_max_count 属性用于跟踪当前正在执行的任务数。_taskList 属性存储所有等待执行的排队任务。_max 属性指定可以同时执行的最大并发任务数。

构造函数使用默认值初始化这些属性。它还接受一个可选的 max 参数,允许在创建新队列实例时设置最大并发任务数。

trigger(fn: Function, params:any) {
    return new Promise((res, req) => {
      const task = this._execute(fn, params, res, req);
      if (this._count < this._max) {
        task();
      } else {
        this._taskList.push(task);
      }
    })
  }

trigger 方法用于向队列中添加新任务。它接受两个参数,一个表示要执行的任务的函数 fn 和任何应传递给该函数的参数。

方法返回一个 Promise,在任务完成时解析或在出现错误时拒绝。

trigger 方法内部,我们首先使用 _execute 方法创建一个新的 task,该方法返回一个函数,该函数执行实际任务。

然后,我们检查当前正在运行的任务数(_count)是否小于允许的最大并发任务数(_max)。如果是,则立即通过调用 task() 执行任务。否则,我们将任务推入 _taskList 数组中,表示该任务正在等待稍后执行。

_execute(fn:Function, params:any, res: any, req:any) {
    return () => {
      this._count++;
      let executeFlag = true;
      let timer = setTimeout(() => {
        console.error('queue-warning: 任务执行超时');
        clearTimeout(timer);
        this._count>0 && this._count--;
        executeFlag = false;
        this._next();
      },1000 * 10)
      fn(params).then(res).catch(req).finally(() => {
        this._count>0 && this._count--;
        clearTimeout(timer);
        if (executeFlag) {
          this._next();
        }
      })
    }
  }

_execute 方法负责创建实际添加到队列中的任务函数。它接受四个参数 - 原始的fn任务函数,任何传递给该函数的params参数以及两个回调函数resreq

这个方法返回一个新的函数,该函数执行原始任务函数以提供的参数。首先,它将_count属性增加1以表示已经开始执行新任务。然后,它记录消息以指示当前正在执行哪个任务。

接下来,它创建一个计时器,用于检测任务是否超时。如果任务执行时间超过10秒,则会触发超时错误并结束任务。然后,它减少_count属性,表示已完成一个任务,并将任务标记为无法执行,以便在下一次 _next() 被调用时处理队列中的下一个任务。

最后,它返回一个 Promise,当任务完成时解析或在出现错误时拒绝。在这个 Promise 的 finally 块中,我们再次减少_count属性,表示任务已完成。同时,我们清除计时器并检查 executeFlag 标志。如果它已被设置为 false,则说明任务已超时,不需要再次调用 _next() 来处理队列中的下一个任务。否则,我们调用 _next() 方法来处理队列中的下一个任务。

_next() {
    if (this._taskList.length > 0) {
      const task = this._taskList.shift();
      task && task();
    }
  }

最后,我们有 _next 方法,该方法处理队列中的下一个任务。如果队列_taskList中有等待执行的任务,则从数组中获取第一个任务并执行它。

通过使用这个队列,您可以控制任务执行的速率,并防止资源耗尽或应用程序崩溃。希望对你有帮助!

    注意:队列中的异步任务fn必须保证有 resolve 或 reject 返回值,否则队列会处于 pendding 中,会等待10秒超时继续执行下一个任务
    

代码

class Queue {
  private _count: number = 0
  private _taskList: Array<Function> = [];
  private _max: number = 1;
  constructor(max: number = 1) {
    this._max = max;
    this._count = 0;
    this._taskList = [];
  }
  trigger(fn: Function, params:any) {
    return new Promise((res, req) => {
      const task = this._execute(fn, params, res, req);
      if (this._count < this._max) {
        task();
      } else {
        console.log('=-------------->存在队列', this._taskList.length)
      }
    })
  }
  _execute(fn:Function, params:any, res: any, req:any) {
    return () => {
      this._count++;
      let executeFlag = true;
      let timer = setTimeout(() => {
        console.error('queue-warning: 任务执行超时');
        clearTimeout(timer);
        this._count>0 && this._count--;
        executeFlag = false;
        this._next();
      },1000 * 10)
      fn(params).then(res).catch(req).finally(() => {
        this._count>0 && this._count--;
        clearTimeout(timer);
        if (executeFlag) {
          this._next();
        }
      })
    }
  }
  _next() {
    if (this._taskList.length > 0) {
      const task = this._taskList.shift();
      task && task();
    }
  }
}

const myQueue = new Queue()
myQueue.trigger(fn,params)
转载自:https://juejin.cn/post/7237425413873860665
评论
请登录