React中的Task的任务调度之scheduler
前言
Event Loop实质上基于时间切片的任务(Task)调度,以实现单线程能够在不同的时间点执行不同的任务,如果一个任务Task是个Long Task,就会导致过长占用CPU(starved cpu),而不能执行其他任务,比如渲染绘制页面的任务(用户感知到的就是页面卡死,没有响应);
React Scheduler是在Task中执行的是更小颗粒度的任务调度机制。为了防止React Scheduler所在的宿主Task长时间占用CPU而饿死其他的任务,scheduler针对不同的renderer限制了其可以占用CPU的最长时间(5ms或10ms),一但超过这个时间阈值就必须将控制权让出给主线程(mian thead),让主线程有机会去执行其他任务,比如去更新页面。
Scheduer存在的意义
将Long Task拆解为多个短Task,以提供过给用户流畅的交互体验。
(图片来源Optimize long tasks)
Scheduler的调度机制
Scheduler 是可以独立于React 单独使用的,其与外界最主要的联系就是通过unstable_scheduleCallback函数进行的,通过此函数以不同的优先级调度回调函数:
注意: 代码注释中的Task指的是Eventloop中的Task,而task指的是React Scheduler调度的任务。在一个Task中可执行一个或多个task。
const callback = () => {
console.log('A');
}
scheduleCallback(NormalPriority, callback);
上面的一段代码就是以优先级NormalPriority
调度回调函数callback
,一个任务的执行优先级越高其被安排的执行时间节点越靠前,所有这一切都依赖于时间是线性递增的。
function unstable_scheduleCallback(
priorityLevel: PriorityLevel,
callback: Callback,
options?: {delay: number},
): Task {
// 当前时间
var currentTime = getCurrentTime();
// 任务开始执行的时间,可以通过设置option.delay推迟任务开始执行时间
var startTime;
if (typeof options === 'object' && options !== null) {
var delay = options.delay;
if (typeof delay === 'number' && delay > 0) {
startTime = currentTime + delay;
} else {
startTime = currentTime;
}
} else {
startTime = currentTime;
}
// 根据任务的优先级设置任务超时时间
var timeout;
switch (priorityLevel) {
case ImmediatePriority:
// Times out immediately
timeout = -1;
break;
case UserBlockingPriority:
// Eventually times out,
timeout = userBlockingPriorityTimeout;
break;
case IdlePriority:
// Never times out
timeout = maxSigned31BitInt;
break;
case LowPriority:
// Eventually times out
timeout = lowPriorityTimeout;
break;
case NormalPriority:
default:
// Eventually times out
timeout = normalPriorityTimeout;
break;
}
// 任务的过期时间,超过了这个时间节点代表着这个任务已经被严重推迟,这时候即使Scheduler所寄生的Task
// 运行时间已经超过了最大阈值(5ms),依然不把控制权让渡给主线程(main thread)而选择继续执行该任务
// 在workLoop函数中还将进一步解释,对应196行
var expirationTime = startTime + timeout;
// 此Task非EventLoop中的Task,可以将其视为EventLoop的Task的subtask
var newTask: Task = {
id: taskIdCounter++,
callback, // 需要被调度的目标任务,在react中主要指的是函数performConcurrentWorkOnRoot
priorityLevel, // 目标任务的优先级
startTime, // 目标任务从被放入taskQueue的时间
expirationTime, // 目标任务过期时间
sortIndex: -1, // 排序字段,taskQueue中指expirationTime,timerQueue中指startTime
};
if (startTime > currentTime) {
// option.delay > 0,推迟执行的任务,React中没有通过此分支逻辑调度任务
newTask.sortIndex = startTime;
// timerQueue一个数组,根据sortIndex(startTime)值进行排序,小的排在前面被优先处理
// 此处排序使用的小根堆算法,感兴趣可以去看一下
push(timerQueue, newTask);
if (peek(taskQueue) === null && newTask === peek(timerQueue)) {
// 所有的任务都是被推迟的,且新进的任务需要第一个被执行
if (isHostTimeoutScheduled) {
// 新进的Task需要被优先处理,取消现有的timer
cancelHostTimeout();
} else {
isHostTimeoutScheduled = true;
}
// 为新进的Task重新设定一个timeout
requestHostTimeout(handleTimeout, startTime - currentTime);
}
} else {
// taskQueue一个数组,根据sortIndex(expirationTime)值进行排序,小的排在前面被优先处理
// 在React中都是通过此分支调度Task
newTask.sortIndex = expirationTime;
push(taskQueue, newTask);
// 任意时刻只需要有一个Task用于执行performWorkUntilDeadline,
// isHostCallbackScheduled用于同步多次调用scheduleCallback只需要通过一个Task来处理
// isPerformingWork 用来阻止taskQueue中的task在执行的过程中调度Task
if (!isHostCallbackScheduled && !isPerformingWork) {
isHostCallbackScheduled = true;
requestHostCallback();
}
}
return newTask;
}
function requestHostCallback() {
// isMessageLoopRunning为true时,代表已经存在了一个Task,无需再额外调度Task,反之则需要调度一个Task
if (!isMessageLoopRunning) {
isMessageLoopRunning = true;
schedulePerformWorkUntilDeadline();
}
}
// 此函数始终是通过异步调度执行的,在主线程(mian thread)对应的就是一个Task
// 此Task占用主线程的时间上限设置为5ms,也就是time slice时间片为5ms,
// 从而确保任务Task不会变成Long Task,从而确保能够及时响应用户的操作
const performWorkUntilDeadline = () => {
if (isMessageLoopRunning) {
const currentTime = getCurrentTime();
// 此任务开始的时间,用于计算阻塞多久,一旦超过5ms,就要将控制权交给主线程,让其有机会
// 执行更高优先级的任务比如渲染刷新页面,执行click回调函数等
startTime = currentTime;
// 默认为true,以确保在处理taskQueue中的task而抛出异常时能够通过调用新的Task被继续处理
let hasMoreWork = true;
try {
// 真正的去处理通过scheduler调度的任务
hasMoreWork = flushWork(currentTime);
} finally {
if (hasMoreWork) {
// 此处为调用Task的第二处
// 进入此分支有以下几种情况
// 1.还有未被处理的task且这些任务能够接受的的最晚执行时间还没来到,但由于占用主线程时间超过了5ms,此时将控制权让渡给主线程
// 2. 在执行task时,返回了一个函数,复用当前task,并将task的callback更新微返回的函数,用于提前将控制权交给主线程,即使Task执行时长小于5ms
// 3. 在执行task的callback时抛出了异常
schedulePerformWorkUntilDeadline();
} else {
isMessageLoopRunning = false;
}
}
}
};
function flushWork(initialTime: number) {
// We'll need a host callback the next time work is scheduled.
isHostCallbackScheduled = false;
if (isHostTimeoutScheduled) {
// We scheduled a timeout but it's no longer needed. Cancel it.
isHostTimeoutScheduled = false;
cancelHostTimeout();
}
// isPerformingWork阻止Task执行过程中调度Task
isPerformingWork = true;
const previousPriorityLevel = currentPriorityLevel;
try {
return workLoop(initialTime);
} finally {
currentTask = null;
currentPriorityLevel = previousPriorityLevel;
isPerformingWork = false;
}
}
function shouldYieldToHost() {
const timeElapsed = getCurrentTime() - startTime;
if (timeElapsed < frameInterval) {
// The main thread has only been blocked for a really short amount of time;
// smaller than a single frame. Don't yield yet.
return false;
}
// Yield now.
return true;
}
function workLoop(initialTime: number) {
let currentTime = initialTime;
// 将timerQueue中超时的task放入taskQueue中
advanceTimers(currentTime);
currentTask = peek(taskQueue);
while (
currentTask !== null &&
!(enableSchedulerDebugging && isSchedulerPaused)
) {
if (currentTask.expirationTime > currentTime && shouldYieldToHost()) {
// 当前任务失效时间还没有来到,且Task占用主线程的时间超过了5ms,需要将控制权交给主线程
break;
}
// 执行到此处分一下两种情况
// 1.Task占用主线程的时间没有超过5ms,可以继续处理taskQueue中的task
// 2.Task占用主线程时间即使超过了5ms,但当前任务已经失效,必须马上执行
const callback = currentTask.callback;
if (typeof callback === 'function') {
// 通过设置为null, 在下一轮循环中将其从taskQueue中移除
currentTask.callback = null;
currentPriorityLevel = currentTask.priorityLevel;
// 任务需要被执行的最终期限已超过
const didUserCallbackTimeout = currentTask.expirationTime <= currentTime;
// 此处的callback指的是React中的performConcurrentWorkOnRoot,如果任务被严重推迟执行,
// 则不再进行并发(renderRootConcurrent)的渲染而采用同步渲染(renderRootSync),一次性
// 完胜余下的所有任务,而不再拆分成多个Task异步的完成
const continuationCallback = callback(didUserCallbackTimeout);
currentTime = getCurrentTime();
if (typeof continuationCallback === 'function') {
// 当返回的是一个函数时,结束当前Task执行,把控制权让给主线程,提前结束
// 通过此约定实现占用主线程时间没有超过5ms时,也可提前退出,从而让主线程去执行更紧急地任务
// 此处更新了callback复用了当前任务其他属性
currentTask.callback = continuationCallback;
// 处理timerQueue中超时的任务,将其更新到taskQueue中
advanceTimers(currentTime);
return true;
} else {
if (currentTask === peek(taskQueue)) {
// 任务已经被处理过,从taskQueue中移除
pop(taskQueue);
}
// 处理timerQueue中超时的任务,将其更新到taskQueue中
advanceTimers(currentTime);
}
} else {
pop(taskQueue);
}
// 进入下一个task
currentTask = peek(taskQueue);
}
// Task占用主线程的时间已经超过了5ms,需要让出控制权
if (currentTask !== null) {
// 还有未处理的task, 需要异步分配一个Task继续处理
return true;
} else {
// taskQueue中的任务已处理完,处理timerQueue中的timer
// 在React没有通过timerQueue调度任务
const firstTimer = peek(timerQueue);
if (firstTimer !== null) {
requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime);
}
return false;
}
}
使用案例
说明:通过scheduleCallback
往taskQueue中push任务时,会log('Post Message')
;
runtime.fireMessageEvent
会log('Message Event')
并触发performWorkUntilDeadline
处理taskQueue中的task;
此处所有案例均来自React项目中的Scheduler.test.js.
- 在时间切片内结束的任务
it('task that finishes before deadline', () => {
scheduleCallback(NormalPriority, () => {
runtime.log('Task');
});
runtime.assertLog(['Post Message']);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'Task']);
});
- task返回一个函数,让出控制权
it('task with continuation', () => {
// scheduleCallback的callback在一个Task中执行,而callback返回的函数则是在另一Task中执行
scheduleCallback(NormalPriority, () => {
runtime.log('Task');
// 持续执行,知道运行时间超过了时间切片(10ms)
while (!Scheduler.unstable_shouldYield()) {
runtime.advanceTime(1);
}
runtime.log(`Yield at ${performance.now()}ms`);
// 返回了一个函数,直接让出主线程的控制权,同时调度一个新的Task用于处理Continuation
return () => {
runtime.log('Continuation');
};
});
// 断言调度了一个Task
runtime.assertLog(['Post Message']);
// 触发Task(performWorkUntilDeadline)执行
runtime.fireMessageEvent();
runtime.assertLog([
'Message Event',
'Task',
'Yield at 10ms',
'Post Message',// 调度一个新的Task处理Continuation
]);
// 触发用于执行Continuation的Task
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'Continuation']);
});
- 同步push多个task,Task与task是一对多的关系,而不是一对一的关系,在任意时刻有且仅有一个Task用于处理taskQueue中的task。push是同步执行的,而Task被延后执行的,从而实现了一对多的批量操作。
it('multiple tasks', () => {
// 异步的调度一个Task,用于处理taskQueue中的task
scheduleCallback(NormalPriority, () => {
runtime.log('A');
});
// 已经调度了一个Task,isHostCallbackScheduled防止多个Task被调度
scheduleCallback(NormalPriority, () => {
runtime.log('B');
});
// Post Message只出现了一次,说用当前只存在一个Task
runtime.assertLog(['Post Message']);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'A', 'B']);
});
- 同步push多个任务,但第一个task执行时间超过了时间切片(10ms),需要将控制权让出给主线程,而剩余的任务则需要被安排到新的Task中去执行,以实现将Long Task拆解成短task目的。
it('multiple tasks with a yield in between', () => {
// A 执行时间为4999ms,超过了所设置的时间切片10ms,结束当前Task,重新调度一个新的Task处理B
scheduleCallback(NormalPriority, () => {
runtime.log('A');
runtime.advanceTime(4999);
});
scheduleCallback(NormalPriority, () => {
runtime.log('B');
});
runtime.assertLog(['Post Message']);
runtime.fireMessageEvent();
runtime.assertLog([
'Message Event',
'A',
// A运行时间过长,让出控制权,同时调度一个新的Task处理 B, shouleYieldToHots()返回true
'Post Message',
]);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'B']);
});
- 取消taskQueue中的task
it('cancels tasks', () => {
const task = scheduleCallback(NormalPriority, () => {
runtime.log('Task');
});
runtime.assertLog(['Post Message']);
// 将task中的callback重置为null
cancelCallback(task);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event']);
});
- 当taskQueue中的task的callback抛出异常时,其后面的任务依然能够被执行
it('throws when a task errors then continues in a new event', () => {
// 抛出异常
scheduleCallback(NormalPriority, () => {
runtime.log('Oops!');
throw Error('Oops!');
});
// 我前面任务虽然抛出异常,但不影响我的执行,将另外调度一个Task继续处理后面的任务
scheduleCallback(NormalPriority, () => {
runtime.log('Yay');
});
runtime.assertLog(['Post Message']);
expect(() => runtime.fireMessageEvent()).toThrow('Oops!');
// 跑出异常后,调度新的Task处理后面任务,这就是hasMore初始值设为true目的所在
runtime.assertLog(['Message Event', 'Oops!', 'Post Message']);
runtime.fireMessageEvent();
// 异常任务后面的任务成功执行
runtime.assertLog(['Message Event', 'Yay']);
});
- 在taskQueue为空后重新调度一个Task
it('schedule new task after queue has emptied', () => {
scheduleCallback(NormalPriority, () => {
runtime.log('A');
});
runtime.assertLog(['Post Message']);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'A']);
// isHostCallbackScheduled,isPerformingWork和isMessageLoopRunning值均为为false;
scheduleCallback(NormalPriority, () => {
runtime.log('B');
});
runtime.assertLog(['Post Message']);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'B']);
});
- 在取消taskQueue中一个任务后,重新调度一个task
it('schedule new task after a cancellation', () => {
const handle = scheduleCallback(NormalPriority, () => {
runtime.log('A');
});
runtime.assertLog(['Post Message']);
cancelCallback(handle);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event']);
scheduleCallback(NormalPriority, () => {
runtime.log('B');
});
runtime.assertLog(['Post Message']);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'B']);
});
- 当task返回一个函数时立即让出控制权,比如task1阻塞主线程时间2ms,时间切片为10ms,也就是说这时候还有8ms可用于执行taskQueue中的其他任务,但由于task的callback返回了一个函数,所以需要立即结束当前任务,让出控制权,同时调度一个新的Task用于处理后续的任务
it('yielding continues in a new task regardless of how much time is remaining', () => {
scheduleCallback(NormalPriority, () => {
runtime.log('Original Task');
runtime.log('shouldYield: ' + shouldYield());
runtime.log('Return a continuation');
return () => {
runtime.log('Continuation Task');
};
});
runtime.assertLog(['Post Message']);
runtime.fireMessageEvent();
runtime.assertLog([
'Message Event',
'Original Task',
// 该任务耗时还没超过时间切片10ms
'shouldYield: false',
'Return a continuation',
// 调度一个新的Task(macrotask)用于处理返回函数(Continuation Task),而不管当前时间切片还剩下多少时间
'Post Message',
]);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'Continuation Task']);
});
以上测试用例均为官方提供的,以下则为非官方的自己提供测试用例,分别用于解释isPerformingWork和isMessageLoopRunning,其实这里是有优化空间的,scheduler为了实现任意时刻至多一个Task用于处理调度任务,其用到了三个变量分别是isHostCallbackScheduled,isPerformingWork和isMessageLoopRunning,其实是只需要一个变量便可实现的,试想一下如何通过一个变量是如何实现?
- 当task的callback调用scheduleCallback,直接将task push到taskQueue中,由于isPerformingWork值为true,也就是当前已经存在了一个Task正在处理Task无需再调度一个Task
it('prevent schedule new Task when taskQueue is being processed', () => {
scheduleCallback(NormalPriority, () => {
runtime.log('Task A');
scheduleCallback(NormalPriority, () => {
runtime.log('Task B');
});
});
runtime.assertLog(['Post Message']);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'Task A', 'Task B']);
});
- 当task的callback返回了一个函数(continue), 此时调度新的task将阻止其调度新的Task,因为continue已经调度了一个Task等待执行
it('prevent schedule new Task when a Task has been scheduled for Continuation', () => {
scheduleCallback(NormalPriority, () => {
runtime.log('Task A');
return () => {
runtime.log('Continuation Task');
}
});
runtime.assertLog(['Post Message']);
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'Task A', 'Post Message']);
// 此时isMessageLoopRunning为true,isHostCallbackScheduled,isPerformingWork值false;
scheduleCallback(NormalPriority, () => {
runtime.log('Task B');
});
runtime.fireMessageEvent();
runtime.assertLog(['Message Event', 'Continuation Task', 'Task B']);
});
总结
React Scheduler设置其占用主线程的时间阈值为10ms,在这个时间片内处理taskQueue中的task,超出这个时间而没有被处理的task则调度另一个Task继续处理,这里的task主要指是React中performConcurrentWorkOnRoot函数,该函数执行render+commit,其中render阶段可并发执行(即render中断恢复机制,执行时间超过10ms,中断,在下一个Task中恢复继续,从而实现将一个Long Task变成多个短Task),commit阶段始终是同步执行的。
在React中通过scheduleCallback调度任务;通过cancelCallback取消已调度的任务;在renderRootConcurrent中通过shouldYield判定render执行时间是否超过阈值(10ms),若超出了则需要将控制权让出;另一个耦合处的则是Scheduler通过向函数(performConcurrentWorkOnRoot)传递参数(didTimeout),决定render是并发执行还是同步执行。两者关系就是Scheduler将React的整个渲染更新机制作为一个task在调度。
参考资料
转载自:https://juejin.cn/post/7364607002825228328