likes
comments
collection
share

React中的Task的任务调度之scheduler

作者站长头像
站长
· 阅读数 52

前言

Event Loop实质上基于时间切片的任务(Task)调度,以实现单线程能够在不同的时间点执行不同的任务,如果一个任务Task是个Long Task,就会导致过长占用CPU(starved cpu),而不能执行其他任务,比如渲染绘制页面的任务(用户感知到的就是页面卡死,没有响应);

React Scheduler是在Task中执行的是更小颗粒度的任务调度机制。为了防止React Scheduler所在的宿主Task长时间占用CPU而饿死其他的任务,scheduler针对不同的renderer限制了其可以占用CPU的最长时间(5ms或10ms),一但超过这个时间阈值就必须将控制权让出给主线程(mian thead),让主线程有机会去执行其他任务,比如去更新页面。

Scheduer存在的意义

将Long Task拆解为多个短Task,以提供过给用户流畅的交互体验。

React中的Task的任务调度之scheduler (图片来源Optimize long tasks

Scheduler的调度机制

Scheduler 是可以独立于React 单独使用的,其与外界最主要的联系就是通过unstable_scheduleCallback函数进行的,通过此函数以不同的优先级调度回调函数:

注意: 代码注释中的Task指的是Eventloop中的Task,而task指的是React Scheduler调度的任务。在一个Task中可执行一个或多个task。

const callback = () => {
  console.log('A');
}
scheduleCallback(NormalPriority, callback);

上面的一段代码就是以优先级NormalPriority调度回调函数callback,一个任务的执行优先级越高其被安排的执行时间节点越靠前,所有这一切都依赖于时间是线性递增的。

function unstable_scheduleCallback(
  priorityLevel: PriorityLevel,
  callback: Callback,
  options?: {delay: number},
): Task {
  // 当前时间
  var currentTime = getCurrentTime();

  // 任务开始执行的时间,可以通过设置option.delay推迟任务开始执行时间
  var startTime;
  if (typeof options === 'object' && options !== null) {
    var delay = options.delay;
    if (typeof delay === 'number' && delay > 0) {
      startTime = currentTime + delay;
    } else {
      startTime = currentTime;
    }
  } else {
    startTime = currentTime;
  }
   
  // 根据任务的优先级设置任务超时时间
  var timeout;
  switch (priorityLevel) {
    case ImmediatePriority:
      // Times out immediately
      timeout = -1;
      break;
    case UserBlockingPriority:
      // Eventually times out,
      timeout = userBlockingPriorityTimeout;
      break;
    case IdlePriority:
      // Never times out
      timeout = maxSigned31BitInt;
      break;
    case LowPriority:
      // Eventually times out
      timeout = lowPriorityTimeout;
      break;
    case NormalPriority:
    default:
      // Eventually times out
      timeout = normalPriorityTimeout;
      break;
  }

  // 任务的过期时间,超过了这个时间节点代表着这个任务已经被严重推迟,这时候即使Scheduler所寄生的Task
  // 运行时间已经超过了最大阈值(5ms),依然不把控制权让渡给主线程(main thread)而选择继续执行该任务
  // 在workLoop函数中还将进一步解释,对应196行
  var expirationTime = startTime + timeout;

  // 此Task非EventLoop中的Task,可以将其视为EventLoop的Task的subtask
  var newTask: Task = {
    id: taskIdCounter++,
    callback, // 需要被调度的目标任务,在react中主要指的是函数performConcurrentWorkOnRoot
    priorityLevel, // 目标任务的优先级
    startTime, // 目标任务从被放入taskQueue的时间
    expirationTime, // 目标任务过期时间
    sortIndex: -1, // 排序字段,taskQueue中指expirationTime,timerQueue中指startTime
  };

  if (startTime > currentTime) {
    // option.delay > 0,推迟执行的任务,React中没有通过此分支逻辑调度任务
    newTask.sortIndex = startTime;
    // timerQueue一个数组,根据sortIndex(startTime)值进行排序,小的排在前面被优先处理
    // 此处排序使用的小根堆算法,感兴趣可以去看一下
    push(timerQueue, newTask);
    if (peek(taskQueue) === null && newTask === peek(timerQueue)) {
      // 所有的任务都是被推迟的,且新进的任务需要第一个被执行
      if (isHostTimeoutScheduled) {
        // 新进的Task需要被优先处理,取消现有的timer
        cancelHostTimeout();
      } else {
        isHostTimeoutScheduled = true;
      }
      // 为新进的Task重新设定一个timeout
      requestHostTimeout(handleTimeout, startTime - currentTime);
    }
  } else {
    // taskQueue一个数组,根据sortIndex(expirationTime)值进行排序,小的排在前面被优先处理
    // 在React中都是通过此分支调度Task
    newTask.sortIndex = expirationTime;
    push(taskQueue, newTask);
    // 任意时刻只需要有一个Task用于执行performWorkUntilDeadline,
    // isHostCallbackScheduled用于同步多次调用scheduleCallback只需要通过一个Task来处理
    // isPerformingWork 用来阻止taskQueue中的task在执行的过程中调度Task
    if (!isHostCallbackScheduled && !isPerformingWork) {
      isHostCallbackScheduled = true;
      requestHostCallback();
    }
  }

  return newTask;
}

function requestHostCallback() {
  // isMessageLoopRunning为true时,代表已经存在了一个Task,无需再额外调度Task,反之则需要调度一个Task
  if (!isMessageLoopRunning) {
    isMessageLoopRunning = true;
    schedulePerformWorkUntilDeadline();
  }
}

// 此函数始终是通过异步调度执行的,在主线程(mian thread)对应的就是一个Task
// 此Task占用主线程的时间上限设置为5ms,也就是time slice时间片为5ms,
// 从而确保任务Task不会变成Long Task,从而确保能够及时响应用户的操作
const performWorkUntilDeadline = () => {
  if (isMessageLoopRunning) {
    const currentTime = getCurrentTime();
    // 此任务开始的时间,用于计算阻塞多久,一旦超过5ms,就要将控制权交给主线程,让其有机会
    // 执行更高优先级的任务比如渲染刷新页面,执行click回调函数等
    startTime = currentTime;

    // 默认为true,以确保在处理taskQueue中的task而抛出异常时能够通过调用新的Task被继续处理
    let hasMoreWork = true;
    try {
      // 真正的去处理通过scheduler调度的任务
      hasMoreWork = flushWork(currentTime);
    } finally {
      if (hasMoreWork) {
         // 此处为调用Task的第二处
        // 进入此分支有以下几种情况
        // 1.还有未被处理的task且这些任务能够接受的的最晚执行时间还没来到,但由于占用主线程时间超过了5ms,此时将控制权让渡给主线程
        // 2. 在执行task时,返回了一个函数,复用当前task,并将task的callback更新微返回的函数,用于提前将控制权交给主线程,即使Task执行时长小于5ms
        // 3. 在执行task的callback时抛出了异常
        schedulePerformWorkUntilDeadline();
      } else {
        isMessageLoopRunning = false;
      }
    }
  }
};

function flushWork(initialTime: number) {

  // We'll need a host callback the next time work is scheduled.
  isHostCallbackScheduled = false;
  if (isHostTimeoutScheduled) {
    // We scheduled a timeout but it's no longer needed. Cancel it.
    isHostTimeoutScheduled = false;
    cancelHostTimeout();
  }
  // isPerformingWork阻止Task执行过程中调度Task
  isPerformingWork = true;
  const previousPriorityLevel = currentPriorityLevel;
  try {
    return workLoop(initialTime);
  } finally {
    currentTask = null;
    currentPriorityLevel = previousPriorityLevel;
    isPerformingWork = false;
  }
}

function shouldYieldToHost() {
  const timeElapsed = getCurrentTime() - startTime;
  if (timeElapsed < frameInterval) {
    // The main thread has only been blocked for a really short amount of time;
    // smaller than a single frame. Don't yield yet.
    return false;
  }
  // Yield now.
  return true;
}

function workLoop(initialTime: number) {
  let currentTime = initialTime;
  // 将timerQueue中超时的task放入taskQueue中
  advanceTimers(currentTime);
  currentTask = peek(taskQueue);
  while (
    currentTask !== null &&
    !(enableSchedulerDebugging && isSchedulerPaused)
  ) {
    if (currentTask.expirationTime > currentTime && shouldYieldToHost()) {
      // 当前任务失效时间还没有来到,且Task占用主线程的时间超过了5ms,需要将控制权交给主线程
      break;
    }
    // 执行到此处分一下两种情况
    // 1.Task占用主线程的时间没有超过5ms,可以继续处理taskQueue中的task
    // 2.Task占用主线程时间即使超过了5ms,但当前任务已经失效,必须马上执行
    const callback = currentTask.callback;
    if (typeof callback === 'function') {
      // 通过设置为null, 在下一轮循环中将其从taskQueue中移除
      currentTask.callback = null;
      currentPriorityLevel = currentTask.priorityLevel;
      // 任务需要被执行的最终期限已超过
      const didUserCallbackTimeout = currentTask.expirationTime <= currentTime;
      // 此处的callback指的是React中的performConcurrentWorkOnRoot,如果任务被严重推迟执行,
      // 则不再进行并发(renderRootConcurrent)的渲染而采用同步渲染(renderRootSync),一次性
      // 完胜余下的所有任务,而不再拆分成多个Task异步的完成
      const continuationCallback = callback(didUserCallbackTimeout);
      currentTime = getCurrentTime();
      if (typeof continuationCallback === 'function') {
        // 当返回的是一个函数时,结束当前Task执行,把控制权让给主线程,提前结束
        // 通过此约定实现占用主线程时间没有超过5ms时,也可提前退出,从而让主线程去执行更紧急地任务
        // 此处更新了callback复用了当前任务其他属性
        currentTask.callback = continuationCallback;
        // 处理timerQueue中超时的任务,将其更新到taskQueue中
        advanceTimers(currentTime);
        return true;
      } else {
        if (currentTask === peek(taskQueue)) {
          // 任务已经被处理过,从taskQueue中移除
          pop(taskQueue);
        }
        // 处理timerQueue中超时的任务,将其更新到taskQueue中
        advanceTimers(currentTime);
      }
    } else {
      pop(taskQueue);
    }
    // 进入下一个task
    currentTask = peek(taskQueue);
  }
  // Task占用主线程的时间已经超过了5ms,需要让出控制权
  if (currentTask !== null) {
    // 还有未处理的task, 需要异步分配一个Task继续处理
    return true;
  } else {
    // taskQueue中的任务已处理完,处理timerQueue中的timer
    // 在React没有通过timerQueue调度任务
    const firstTimer = peek(timerQueue);
    if (firstTimer !== null) {
      requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime);
    }
    return false;
  }
}

使用案例

说明:通过scheduleCallback往taskQueue中push任务时,会log('Post Message'); runtime.fireMessageEventlog('Message Event')并触发performWorkUntilDeadline处理taskQueue中的task;

此处所有案例均来自React项目中的Scheduler.test.js.

  1. 在时间切片内结束的任务
it('task that finishes before deadline', () => {
    scheduleCallback(NormalPriority, () => {
      runtime.log('Task');
    });
    runtime.assertLog(['Post Message']);
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'Task']);
});
  1. task返回一个函数,让出控制权
it('task with continuation', () => {
    // scheduleCallback的callback在一个Task中执行,而callback返回的函数则是在另一Task中执行
    scheduleCallback(NormalPriority, () => {
      runtime.log('Task');
      // 持续执行,知道运行时间超过了时间切片(10ms)
      while (!Scheduler.unstable_shouldYield()) {
        runtime.advanceTime(1);
      }
      runtime.log(`Yield at ${performance.now()}ms`);
      
      // 返回了一个函数,直接让出主线程的控制权,同时调度一个新的Task用于处理Continuation
      return () => {
        runtime.log('Continuation');
      };
    });
    // 断言调度了一个Task
    runtime.assertLog(['Post Message']);
    
    // 触发Task(performWorkUntilDeadline)执行
    runtime.fireMessageEvent();
    runtime.assertLog([
      'Message Event',
      'Task',
      'Yield at 10ms',
      'Post Message',// 调度一个新的Task处理Continuation
    ]);
    // 触发用于执行Continuation的Task
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'Continuation']);
  });
  1. 同步push多个task,Task与task是一对多的关系,而不是一对一的关系,在任意时刻有且仅有一个Task用于处理taskQueue中的task。push是同步执行的,而Task被延后执行的,从而实现了一对多的批量操作。
it('multiple tasks', () => {
    // 异步的调度一个Task,用于处理taskQueue中的task
    scheduleCallback(NormalPriority, () => {
      runtime.log('A');
    });
    // 已经调度了一个Task,isHostCallbackScheduled防止多个Task被调度
    scheduleCallback(NormalPriority, () => {
      runtime.log('B');
    });
    // Post Message只出现了一次,说用当前只存在一个Task
    runtime.assertLog(['Post Message']);
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'A', 'B']);
});
  1. 同步push多个任务,但第一个task执行时间超过了时间切片(10ms),需要将控制权让出给主线程,而剩余的任务则需要被安排到新的Task中去执行,以实现将Long Task拆解成短task目的。
it('multiple tasks with a yield in between', () => {
    // A 执行时间为4999ms,超过了所设置的时间切片10ms,结束当前Task,重新调度一个新的Task处理B
    scheduleCallback(NormalPriority, () => {
      runtime.log('A');
      runtime.advanceTime(4999);
    });
    scheduleCallback(NormalPriority, () => {
      runtime.log('B');
    });
    runtime.assertLog(['Post Message']);
    runtime.fireMessageEvent();
    runtime.assertLog([
      'Message Event',
      'A',
      // A运行时间过长,让出控制权,同时调度一个新的Task处理 B, shouleYieldToHots()返回true
      'Post Message',
    ]);
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'B']);
});
  1. 取消taskQueue中的task
it('cancels tasks', () => {
    const task = scheduleCallback(NormalPriority, () => {
      runtime.log('Task');
    });
    runtime.assertLog(['Post Message']);
    // 将task中的callback重置为null
    cancelCallback(task);
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event']);
 });
  1. 当taskQueue中的task的callback抛出异常时,其后面的任务依然能够被执行
it('throws when a task errors then continues in a new event', () => {
    // 抛出异常
    scheduleCallback(NormalPriority, () => {
      runtime.log('Oops!');
      throw Error('Oops!');
    });
    // 我前面任务虽然抛出异常,但不影响我的执行,将另外调度一个Task继续处理后面的任务
    scheduleCallback(NormalPriority, () => {
      runtime.log('Yay');
    });
    runtime.assertLog(['Post Message']);

    expect(() => runtime.fireMessageEvent()).toThrow('Oops!');
    // 跑出异常后,调度新的Task处理后面任务,这就是hasMore初始值设为true目的所在
    runtime.assertLog(['Message Event', 'Oops!', 'Post Message']);

    runtime.fireMessageEvent();
    // 异常任务后面的任务成功执行
    runtime.assertLog(['Message Event', 'Yay']);
});
  1. 在taskQueue为空后重新调度一个Task
it('schedule new task after queue has emptied', () => {
    scheduleCallback(NormalPriority, () => {
      runtime.log('A');
    });

    runtime.assertLog(['Post Message']);
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'A']);
    
    // isHostCallbackScheduled,isPerformingWork和isMessageLoopRunning值均为为false;
    scheduleCallback(NormalPriority, () => {
      runtime.log('B');
    });
    runtime.assertLog(['Post Message']);
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'B']);
});
  1. 在取消taskQueue中一个任务后,重新调度一个task
it('schedule new task after a cancellation', () => {
    const handle = scheduleCallback(NormalPriority, () => {
      runtime.log('A');
    });

    runtime.assertLog(['Post Message']);
    cancelCallback(handle);

    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event']);

    scheduleCallback(NormalPriority, () => {
      runtime.log('B');
    });
    runtime.assertLog(['Post Message']);
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'B']);
});
  1. 当task返回一个函数时立即让出控制权,比如task1阻塞主线程时间2ms,时间切片为10ms,也就是说这时候还有8ms可用于执行taskQueue中的其他任务,但由于task的callback返回了一个函数,所以需要立即结束当前任务,让出控制权,同时调度一个新的Task用于处理后续的任务
it('yielding continues in a new task regardless of how much time is remaining', () => {
    scheduleCallback(NormalPriority, () => {
      runtime.log('Original Task');
      runtime.log('shouldYield: ' + shouldYield());
      runtime.log('Return a continuation');
      return () => {
        runtime.log('Continuation Task');
      };
    });
    runtime.assertLog(['Post Message']);

    runtime.fireMessageEvent();
    runtime.assertLog([
      'Message Event',
      'Original Task',
      // 该任务耗时还没超过时间切片10ms
      'shouldYield: false',
      'Return a continuation',

      // 调度一个新的Task(macrotask)用于处理返回函数(Continuation Task),而不管当前时间切片还剩下多少时间
      'Post Message',
    ]);

    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'Continuation Task']);
});

以上测试用例均为官方提供的,以下则为非官方的自己提供测试用例,分别用于解释isPerformingWork和isMessageLoopRunning,其实这里是有优化空间的,scheduler为了实现任意时刻至多一个Task用于处理调度任务,其用到了三个变量分别是isHostCallbackScheduled,isPerformingWork和isMessageLoopRunning,其实是只需要一个变量便可实现的,试想一下如何通过一个变量是如何实现?

  1. 当task的callback调用scheduleCallback,直接将task push到taskQueue中,由于isPerformingWork值为true,也就是当前已经存在了一个Task正在处理Task无需再调度一个Task
it('prevent schedule new Task when taskQueue is being processed', () => {
    scheduleCallback(NormalPriority, () => {
      runtime.log('Task A');
      scheduleCallback(NormalPriority, () => {
          runtime.log('Task B');
      });
    });
    runtime.assertLog(['Post Message']);
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'Task A', 'Task B']);
  });
  1. 当task的callback返回了一个函数(continue), 此时调度新的task将阻止其调度新的Task,因为continue已经调度了一个Task等待执行
it('prevent schedule new Task when a Task has been scheduled for Continuation', () => {
    scheduleCallback(NormalPriority, () => {
      runtime.log('Task A');
      return () => {
          runtime.log('Continuation Task');
      }
    });
    runtime.assertLog(['Post Message']);
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'Task A', 'Post Message']);
    // 此时isMessageLoopRunning为true,isHostCallbackScheduled,isPerformingWork值false;
    scheduleCallback(NormalPriority, () => {
       runtime.log('Task B');
    });
    runtime.fireMessageEvent();
    runtime.assertLog(['Message Event', 'Continuation Task', 'Task B']);
});

总结

React Scheduler设置其占用主线程的时间阈值为10ms,在这个时间片内处理taskQueue中的task,超出这个时间而没有被处理的task则调度另一个Task继续处理,这里的task主要指是React中performConcurrentWorkOnRoot函数,该函数执行render+commit,其中render阶段可并发执行(即render中断恢复机制,执行时间超过10ms,中断,在下一个Task中恢复继续,从而实现将一个Long Task变成多个短Task),commit阶段始终是同步执行的。

在React中通过scheduleCallback调度任务;通过cancelCallback取消已调度的任务;在renderRootConcurrent中通过shouldYield判定render执行时间是否超过阈值(10ms),若超出了则需要将控制权让出;另一个耦合处的则是Scheduler通过向函数(performConcurrentWorkOnRoot)传递参数(didTimeout),决定render是并发执行还是同步执行。两者关系就是Scheduler将React的整个渲染更新机制作为一个task在调度。

参考资料

转载自:https://juejin.cn/post/7364607002825228328
评论
请登录