likes
comments
collection
share

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

作者站长头像
站长
· 阅读数 119

上一篇中,我们通过研究 Future 的源码,认识了 Future 类只是一层 监听-通知 机制的封装。并引出了 TimerscheduleMicrotask 两个可以异步触发回调函数的方式。本文我们将继续对这两者进行探索,认识一下其背后隐藏的 消息处理机制微任务循环


一、从 Timer 认识消息处理

我们已经知道, Future.delayedFuture 构造创建的延时 异步任务 ,本质上是 Timer 对象设置定时回调。对于回调来说,有两个非常重要的线索:

回调函数去哪里了 ? 回调函数如何触发的 ?


1.Timer 回调是如何触发

对于 Timer 来说,可以通过 Duration 指定时长,触发回调函数。要注意,这里的 Duration 并非严格的时长,也不是由于程序运行中的不稳定性出现的 误差 。如下案例中,在发起 200 ms 的延迟异步任务后,执行 loopAdd 的同步任务,进行 10 亿次 累加,耗时 444 ms

即使异步在设置的期望延迟执行时间为 200 ms ,但实际运行时异步回调 也必须在同步任务执行完毕才可以触发。如下 Timer 的回调在 449 ms 之后被触发。之前有个朋友做节拍器,通过 Timer 开启循环任务,发现误差很大,因为 Timer 本来就不是用来处理精确时间的,对时间精度要求高的场景,需要使用 Ticker

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

main() {
  int start = DateTime.now().millisecondsSinceEpoch;
  Timer(const Duration(milliseconds: 200),(){
    int end = DateTime.now().millisecondsSinceEpoch;
    print('Timer Cost: ${(end-start)} ms');
  });
  loopAdd(1000000000);
}

int loopAdd(int count) {
  int startTime = DateTime.now().millisecondsSinceEpoch;
  int sum = 0;
  for (int i = 0; i <= count; i++) {
    sum+=i;
  }
  int cost = DateTime.now().millisecondsSinceEpoch-startTime;
  print('loopAdd Cost: $cost ms');
  return sum;
}

这也很容易理解, loopAdd 方法在入栈之后,只有执行完毕才会出栈。 Timer 的回调即使在 200 ms 时完成,也必须在同步任务处理完才可以被处理。这和日常生活中,扫地期间,水开了,可以先处理冲热水,再回去扫地是有些差异的。那 Dart 中异步任务回调的机制是什么呢?下面以回调为线索,进行追踪。


想要了解回调是如何触发的,最好的方式就是 断点调试。如下,在 Timer 回调函数中打断点,运行结果如下:

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

可以很清晰地看到 回调函数 是由 _RawReceivePortImpl._handleMessage 方法触发的。也就是说,Dart 虚拟机 中存在一种消息处理机制,通过对 消息 处理,触发了回调事件。


2. 认识 _RawReceivePortImpl#_handleMessage

_RawReceivePortImpl 类在 isolate_patch.dart 文件中,_handleMessage 是一个私有方法。从调试来看tag2 处handler 函数是导致 Timer 中传入的回调函数触发的诱因。 所以我们来对这个对象进行分析:

class _RawReceivePortImpl implements RawReceivePort {

---->[_RawReceivePortImpl#_handleMessage]----
@pragma("vm:entry-point", "call")
static _handleMessage(int id, var message) {
  final handler = _portMap[id]?['handler']; // tag1
  if (handler == null) {
    return null;
  }
  handler(message); // tag2
  _runPendingImmediateCallback();
  return handler;
}

handler 函数对象在 tag1 处被赋值,它的值为: 以 idkey_portMap 中取值的 handler 属性。也就是说,_portMap 是一个映射对象,键是 int 类型,值是 Map<String, dynamic> 类型。

static final _portMap = <int, Map<String, dynamic>>{};

通过调试可以发现,此时的 handler 对象类型如下:是只有一个参数,无返回值的 _handleMessage 静态方法。那这个 _handleMessage 是什么呢?反正肯定不是上面的 _handleMessage 方法。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

从调试中可以清楚地看到,这个 handler 函数触发时,会进入 _Timer._handleMessage 方法中,所以 handler 自然指的是 _Timer._handleMessage ,函数签名也是可以对得上的。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

这就有个问题:既然 _RawReceivePortImpl#_handleMessage 方法中,可以通过 _portMap 找到该函数,那么_Timer#_handleMessage 一定在某个时刻被加入到了 _portMap 中。下面,我们通过这条线索,追踪一下 _RawReceivePortImpl 如何设置处理器。


3. _RawReceivePortImpl 设置处理器

_Timer 中搜索一下 _handleMessage 就很容易看到它被设置的场景:如下所示,在 _Timer#_createTimerHandler 方法中, 458 行 创建 _RawReceivePortImpl 类型的 port 对象,并在 459 行 为其设置 handler,值正是 _Timer#_handleMessage 方法。

其实从这里就能大概猜到 _RawReceivePortImpl#handlerset 方法的处理逻辑,大家可以暂停脑补一下,猜猜看。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


没错,_RawReceivePortImpl#handlerset 方法,就是为 _portMap 映射添加元素的。这就和 _RawReceivePortImpl#_handleMessage_portMap 映射中取出 handler 函数对象交相呼应。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

到这里,可以知道 _Timer#_createTimerHandler 方法会将回调加入映射中去,就可以大胆的猜测一下,在 Timer 对象创建后,一定会触发 _createTimerHandler


4. Timer 对象的创建与回调函数的 "流浪"

首先 Timer 是一个 抽象类 ,本身并不能实例化对象。如下是 Timer 的普通工厂构造,返回值是由 Zone 对象通过 createTimer 方法创建的对象,也就是说该方法一定会返回 Timer 的实现类对象。另外,我们有了一条新线索,可以追寻一下入参中的回调 callback 是如何在源码中 "流浪" 的。

---->[Timer]----
factory Timer(Duration duration, void Function() callback) {
  if (Zone.current == Zone.root) {
    return Zone.current.createTimer(duration, callback);
  }
  return Zone.current
      .createTimer(duration, Zone.current.bindCallbackGuarded(callback));
}

由于默认情况下, Zone.currentZone.root , 其对应的实现类是 _RootZone 。也就是说 Timer 的构造会触发 _RootZone#createTimer 方法。

static const Zone root = _rootZone;
const _Zone _rootZone = const _RootZone();

所以,callback流浪的第一站是 _RootZonecreateTimer 方法,如下所示。其中会触发 Timer_createTimer 静态方法,这也是 callback 流浪的第二站:

---->[_RootZone#createTimer]----
Timer createTimer(Duration duration, void f()) {
  return Timer._createTimer(duration, f);
}

Timer#_createTimer 静态方法是一个 external 的方法,在 Flutter 提供的 sdk 中是找不到的。

---->[Timer#_createTimer]----
external static Timer _createTimer(
    Duration duration, void Function() callback);

可以在 dart-lang/sdk/sdk/_internal/vm/lib 中找到内部文件,比如这里是 timer_patch.dart。在调试时可以显示相关的代码,但无法进行断点调试。 (PS 不知道有没有人知道如何能调试这种内部文件)

接下来, callback 会进入第三站:作为 factory 函数的第二参回调的处理逻辑。现在问题来了,factory 对象是什么东西? 从代码来看,它是 VMLibraryHooks 类的静态成员 timerFactory。由于 factory 可以像函数一样通过 () 执行,毫无疑问它是一个函数对象,函数的签名也很明确: 三个入参,类型依次是 int回调函数bool ,且返回 Timer 类型成员。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


如下是内部文件 timer_impl.dart 中的代码。其中 _TimerTimer 的实现类,也是 dartTimer 构造时创建的实际类型。从中可以看到 VMLibraryHooks.timerFactory 被赋值为 _Timer._factory ,所以上面说的 factory 是什么就不言而喻了。

class _Timer implements Timer {

481 行 代码可以看出,callback 进入的第三站是 _Timer 的构造函数:

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

_Timer 的构造函数中,callback 进入的第四站: _Timer#_createTimer 方法。其中 _Timer 通过 _internal 构造,将 callback 作为参数传入,为 _Timer_callback 成员赋值。自此, callback 进入第五站,被 _Timer 持有,结束了流浪生涯,终于安家。

---->[_Timer 构造]----
factory _Timer(int milliSeconds, void callback(Timer timer)) {
  return _createTimer(callback, milliSeconds, false);
}

---->[_Timer#_createTimer]----
static _Timer _createTimer(
  if (milliSeconds < 0) {
    milliSeconds = 0;
  }
  int now = VMLibraryHooks.timerMillisecondClock();
  int wakeupTime = (milliSeconds == 0) ? now : (now + 1 + milliSeconds);
  _Timer timer =  new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);
  timer._enqueue(); // tag1
  return timer;
}

---->[_Timer#_internal]----
_Timer._internal(
    this._callback, this._wakeupTime, this._milliSeconds, this._repeating)
    : _id = _nextId();

另外说一句,上面的 tag1timer._enqueue() 会触发 _notifyZeroHandler ,从而导致 _createTimerHandler 的触发。也就是上面 第 3 小节 中为 _RawReceivePortImpl 设置处理器的契机。

_enqueue
   --> _notifyZeroHandler
      --> _createTimerHandler

到这里可以把线索串连一下,想一想:一个 Timer 对象是如何实例化的;其中 callback 对象是如何一步步流浪,最终被 _Timer 对象持有的; _createTimerHandler 是如何为 _RawReceivePortImpl 设置 handler 的。


5. Timer 中的堆队列与链表队列

_Timer#_enqueue 中,当定义的时长非零时,通过堆结构维护 Timer 队列。 数据结构定义在 _TimerHeap 中,就算一个非常简单的 二叉堆,感兴趣的可以自己看看。 _Timer 类持有 _TimerHeap 类型的静态成员 _heap, 这里 enqueue 就是 进入队列 的意思。

---->[_Timer#_heap]----
static final _heap = new _TimerHeap();

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


另外,_Timer 本身是一个链表结构,其中持有 _indexOrNext 属性用于指向下一节点,并维护 _firstZeroTimer_lastZeroTimer 链表首尾节点。在 _enqueue 方法中,当 Timer 时长为 0 ,会通过链表结构维护 Timer 队列。

---->[_Timer]----
Object? _indexOrNext;
static _Timer _lastZeroTimer = _sentinelTimer;
static _Timer? _firstZeroTimer;

---->[_Timer#_enqueue]----
if (_milliSeconds == 0) {
  if (_firstZeroTimer == null) {
    _lastZeroTimer = this;
    _firstZeroTimer = this;
  } else {
    _lastZeroTimer._indexOrNext = this; //
    _lastZeroTimer = this;
  }
  // Every zero timer gets its own event.
  _notifyZeroHandler();

_Timer#_handleMessage 中,会触发 _runTimers 方法,其中入参的 pendingTimers 就是从链表 队列中获取的,已超时或零时 Timer 对象。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


_runTimers 中,会遍历 pendingTimers,取出 Timer中的 _callback 成员, 这个成员就算创建 Timer 时的入参函数,在 398 行触发回调。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

这就是 Timer 回调函数在 Dart 中的一生。把这些理清楚之后, Timer 对象在 Dart 中的处理流程就算是比较全面了。


二、消息的发送端口和接收端口

其实 Dart 中的 Timer 代码只是流程中的一部分。对于消息处理机制来说,还涉及 Dart 虚拟机C++ 代码的处理。也就是 Isolate 的通信机制,这里稍微介绍一下,也有利于后面对 Isolate 的认识。


1. Timer 的发送端口 SendPort

Timer 创建并进入队列后,会在 _createTimerHandler 中创建 _RawReceivePortImpl 对象,这个对象从名称上来看,是一个 接收端口 的实现类。顾名思义,接收端是由于接收消息、处理消息的。通过前面的调试我们知道,接收端消息的处理方法是由 handler 进行设置,如下 tag1 处。

既然有 接收端口,自然有 发送端口_Timer 中持有 SendPort 类型的 _sendPort 对象。该成员在 tag2 处由 接收端口sendPort 赋值。

---->[_Timer#_sendPort]----
static SendPort? _sendPort;
static _RawReceivePortImpl? _receivePort;

static void _createTimerHandler() {
  var receivePort = _receivePort;
  if (receivePort == null) {
    assert(_sendPort == null);
    final port = _RawReceivePortImpl('Timer');
    port.handler = _handleMessage; // tag1
    _sendPort = port.sendPort; // tag2 
    _receivePort = port;
    _scheduledWakeupTime = 0;
  } else {
    receivePort._setActive(true);
  }
  _receivePortActive = true;
}

_RawReceivePortImpl 中的 sendPortget 方法,由 _get_sendport 外部方法实现。最终由 C++ 代码实现,在 【dart-lang/sdk/runtime/lib/isolate.cc】 中触发:

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

---->[_RawReceivePortImpl#sendPort]----
SendPort get sendPort {
  return _get_sendport();
}

@pragma("vm:external-name", "RawReceivePortImpl_get_sendport")
external SendPort _get_sendport();

_createTimerHandler 方法触发之后, _Timer 中的发送、接收端口已经准备完毕。之后自然是要发送消息。在 _Timer#_enqueue 中如果是时长为 0 的定时器,在 tag2 处会通过 _sendPort 发送 _ZERO_EVENT 的事件。

---->[_Timer#_enqueue]----
void _enqueue() {
  if (_milliSeconds == 0) {
    // 略...
    _notifyZeroHandler(); // tag1
    // 略...
}

---->[_Timer#_notifyZeroHandler]----
static void _notifyZeroHandler() {
  if (!_receivePortActive) {
    _createTimerHandler();
  }
  _sendPort!.send(_ZERO_EVENT); // tag2
}

2. 零延迟定时器消息的发送

_SendPortImplSendPort 的唯一实现类。零延迟定时器加入链表队列后,发送 _ZERO_EVENT 消息使用的是如下的 send 方法,最终会通过 SendPortImpl_sendInternal_C++ 方法进行实现,向 Dart 虚拟机 发送消息。

class _SendPortImpl implements SendPort {

@pragma("vm:entry-point", "call")
void send(var message) {
  _sendInternal(message);
}

@pragma("vm:external-name", "SendPortImpl_sendInternal_")
external void _sendInternal(var message);

如下是 【dart-lang/sdk/runtime/lib/isolate.cc】SendPortImpl_sendInternal_ 入口的逻辑。会通过 PortMapPostMessage 静态方法发送消息。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


PortMap 是定义在 port.h 在的 C++ 类,其中 PostMessage 是一个静态方法,从注释来看,该方法会将消息加入消息队列:

---->[sdk\runtime\vm\port.h]----
class PortMap : public AllStatic {

  // Enqueues the message in the port with id. Returns false if the port is not
  // active any longer.
  //
  // Claims ownership of 'message'.
  static bool PostMessage(std::unique_ptr<Message> message,
                        bool before_events = false);
}         

port.ccPostMessage 方法实现时,由 MessageHandler#PostMessage 进行处理:

---->[sdk\runtime\vm\port.cc]----
bool PortMap::PostMessage(std::unique_ptr<Message> message,
                          bool before_events) {
  MutexLocker ml(mutex_);
  if (ports_ == nullptr) {
    return false;
  }
  auto it = ports_->TryLookup(message->dest_port());
  if (it == ports_->end()) {
    // Ownership of external data remains with the poster.
    message->DropFinalizers();
    return false;
  }
  MessageHandler* handler = (*it).handler;
  ASSERT(handler != nullptr);
  handler->PostMessage(std::move(message), before_events);
  return true;
}

MessageHandler 类中有两个 MessageQueue 消息队列成员, queue_oob_queue_ 。如下,在 message_handler.cc 中,会根据 message#IsOOB 值将消息加入到消息队列中。 IsOOBMessage 对象的优先级 Priority 枚举属性决定。 为 OOB 消息 是优先处理的消息。

SendPortImpl_sendInternal_ 中加入的消息是 kNormalPriority 优先级的,也就是普通消息。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

---->[MessageHandler]----
MessageQueue* queue_;
MessageQueue* oob_queue_;

---->[sdk\runtime\vm\message_handler.cc]----
void MessageHandler::PostMessage(std::unique_ptr<Message> message,
                                 bool before_events) {
  Message::Priority saved_priority;
    // 略...
    saved_priority = message->priority();
    if (message->IsOOB()) {
      oob_queue_->Enqueue(std::move(message), before_events);
    } else {
      queue_->Enqueue(std::move(message), before_events);
    }
   // 略...
  }

  // Invoke any custom message notification.
  MessageNotify(saved_priority);
}

3. Dart 中 _RawReceivePortImpl#_handleMessage 的触发

在消息加入队列之后,会触发 MessageNotify 进行处理,这里就不细追了。最后看一下,C++ 的代码是如何导致 Dart 中的_RawReceivePortImpl#_handleMessage 方法触发的。如下所示,【runtime/vm/dart_entry.cc】 中定义了很多入口函数。其中 DartLibraryCalls::HandleMessage 里会触发 object_store 中存储的 handle_message_function :

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


【runtime/vm/object_store.cc】LazyInitIsolateMembers 中,会将 _RawReceivePortImpl_handleMessage 存储起来。这就是 C++DartLibraryCalls::HandleMessage 会触发Dart_RawReceivePortImpl#_handleMessage 的原因。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


到这里,我们再回首一下本文开始时的调试结果。大家可以结合下面的线索自己疏通一下: Timer 对象的创建、 Dart 端设置监听、回调函数的传递、 Timer 队列的维护 、Timer 发送和接收端口的创建、Timer 发送消息、消息加入 C++ 中消息息队列、最后 C++ 处理消息,通知 Dart 端触发 _RawReceivePortImpl#_handleMessage

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

这就是 Timer 异步任务最简单的消息处理流程。


3. 有延迟定时器消息的发送

前面介绍的是零延迟的消息发送,下面看一下有延迟时消息发送的处理。如下所示,当进入队列时 _milliSeconds 非零,加入堆队列,通过 _notifyEventHandler 来发送消息:

---->[_Timer#_enqueue]----
void _enqueue() {
  if (_milliSeconds == 0) {
    // 略...
  } else {
    _heap.add(this);
    if (_heap.isFirst(this)) {
      _notifyEventHandler();
    }
  }
}

_notifyEventHandler 在开始会进行一些空链表的校验,触发 _scheduleWakeup 方法,告知 EventHandler 在指定的时间后告知当前的 isolate 。其中发送通知的核心方法是 VMLibraryHooks#eventHandlerSendData

---->[_Timer#_enqueue]----
static void _notifyEventHandler() {
  // 略 基础判断...
  if ((_scheduledWakeupTime == 0) || (wakeupTime != _scheduledWakeupTime)) {
    _scheduleWakeup(wakeupTime);
  }
}

---->[_Timer#_enqueue]----
// Tell the event handler to wake this isolate at a specific time.
static void _scheduleWakeup(int wakeupTime) {
  if (!_receivePortActive) {
    _createTimerHandler();
  }
  VMLibraryHooks.eventHandlerSendData(null, _sendPort!, wakeupTime);
  _scheduledWakeupTime = wakeupTime;
}

通过 eventHandler 发送的消息,由 C++【sdk/runtime/bin/eventhandler.cc】 处理,如下所示:交由 EventHandler 类触发 SendData 处理:

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

EventHandler 中持有 EventHandlerImplementation 类型的 delegate_ 成员, SendData 方法最终由实现类完成:

class EventHandler {
 public:
  EventHandler() {}
  void SendData(intptr_t id, Dart_Port dart_port, int64_t data) {
    delegate_.SendData(id, dart_port, data);
  }
 // 略...
 private:
  friend class EventHandlerImplementation;
  EventHandlerImplementation delegate_;

不同的平台都有相关的实现类,具体处理逻辑就不细追了。因为定时的延迟任务不会阻塞当前线程,使用肯定要交给 C++ 创建子启线程处理。延迟完成之后,最终会通知 Dart 端触发 _RawReceivePortImpl#_handleMessage,从而完成定时回调的任务。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

可以看出一个小小的 延迟任务 , 其中涉及的知识也是非常广泛的。通过 Timer 来了解消息处理机制是一个比较好的切入点。


三、 main 函数的启动与微任务循环

上一篇我们知道 scheduleMicrotask 的回调对于 main 函数体来说也是异步执行的,但那它和 Timer 触发的延迟任务有着本质的区别。接下来我们将对 scheduleMicrotask 进行全面分析,从中就能明白为什么 scheduleMicrotask 中的回调总可以在 Timer 之前触发。 不过在此之前,必须说明一下 main 函数的触发。


1. main 函数的触发

在我们的认知中,main 函数是程序的入口。但如果打个断点调试一下会发现,其实 main 函数本身是一个回调。和 Timer 回调一样,也是由 _RawReceivePortImpl#_handleMessage 方法触发的:

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


所以 main 函数的触发也是涉及 消息通知机制,如下所示: _startIsolate 触发 _delayEntrypointInvocation 方法,其中创建 RawReceivePort 接收端看对象,并为 handler 赋值。

@pragma("vm:entry-point", "call")
void _startIsolate(
    Function entryPoint, List<String>? args, Object? message, bool isSpawnUri) {
  _delayEntrypointInvocation(entryPoint, args, message, isSpawnUri);
}

void _delayEntrypointInvocation(Function entryPoint, List<String>? args,
    Object? message, bool allowZeroOneOrTwoArgs) {
  final port = RawReceivePort();
  port.handler = (_) { // tag1
    port.close();
    if (allowZeroOneOrTwoArgs) {
      // 略...
      } else {
        entryPoint(); // tag2
      }
    } else {
      entryPoint(message);
    }
  };
  port.sendPort.send(null);
}

也就是说收到消息,触发 _RawReceivePortImpl#_handleMessage 时,执行的 handler 就是 tag1 所示的函数。 tag2entryPoint() 方法就是 main 方法。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


2. scheduleMicrotask 回调函数的触发

如下所示,在 scheduleMicrotask 的回调中打上断点,可以看出它也是由 _RawReceivePortImpl#_handleMessage 触发的。难道 scheduleMicrotask 也是走的消息处理机制? 这个问题萦绕我很久,现在幡然醒悟:其实不然 !

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

void main() {
  scheduleMicrotask(() {
    print("executed1"); //<-- 断点
  });
}

认清这里的 _RawReceivePortImpl#_handleMessage 是谁触发的,对理解 scheduleMicrotask 至关重要。从下面两个调试中 _handleMessageid 可以看出,这是同一个 _RawReceivePortImpl#_handleMessage

  • main 函数触发

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

  • scheduleMicrotask 回调函数触发

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


如下所示,在 192 行触发 main 函数 ,当函数体执行完毕,会通过 _runPendingImmediateCallback 执行微任务回调。所以才会产生 scheduleMicrotask 回调函数后于 main 函数体执行的异步效果。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环


3. 微任务循环的执行

_runPendingImmediateCallback 方法会执行 _pendingImmediateCallback 函数对象。从调试来看,运行时该函数对象为 _startMicrotaskLoop:

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

@pragma("vm:entry-point", "call")
void _runPendingImmediateCallback() {
  final callback = _pendingImmediateCallback;
  if (callback != null) {
    _pendingImmediateCallback = null;
    callback();
  }
}

_startMicrotaskLoop 是定义在 schedule_microtask.dart 中的私有方法,其中会触发 _microtaskLoop 方法,执行微任务队列:

void _startMicrotaskLoop() {
  _isInCallbackLoop = true;
  try {
    _microtaskLoop(); // tag1 
  } finally {
    _lastPriorityCallback = null;
    _isInCallbackLoop = false;
    if (_nextCallback != null) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  }
}

微任务通过 _AsyncCallbackEntry 进行维护,其中持有 callback 回调和 next 节点,是一个链表结构。_nextCallback_lastCallback 全局变量为首尾节点:

typedef void _AsyncCallback();

class _AsyncCallbackEntry {
  final _AsyncCallback callback;
  _AsyncCallbackEntry? next;
  _AsyncCallbackEntry(this.callback);
}

/// Head of single linked list of pending callbacks.
_AsyncCallbackEntry? _nextCallback;

/// Tail of single linked list of pending callbacks.
_AsyncCallbackEntry? _lastCallback;

_microtaskLoop 中,会遍历 _nextCallback 链表,触发节点中的回调函数, 如 tag1 所示。


/// Head of single linked list of pending callbacks.
_AsyncCallbackEntry? _nextCallback;

/// Tail of single linked list of pending callbacks.
_AsyncCallbackEntry? _lastCallback;


void _microtaskLoop() {
  for (var entry = _nextCallback; entry != null; entry = _nextCallback) {
    _lastPriorityCallback = null;
    var next = entry.next;
    _nextCallback = next;
    if (next == null) _lastCallback = null;
    (entry.callback)(); // tag1
  }
}

4. 微任务队列事件的进入

当使用 scheduleMicrotask 方法时,会触发 _rootScheduleMicrotask 方法,在 _scheduleAsyncCallback 中为微任务列表添加元素:

@pragma('vm:entry-point', 'call')
void scheduleMicrotask(void Function() callback) {
  _Zone currentZone = Zone._current;
  if (identical(_rootZone, currentZone)) {
    _rootScheduleMicrotask(null, null, _rootZone, callback);
    return;
  }
}

void _rootScheduleMicrotask(  Zone? self, ZoneDelegate? parent, Zone zone, void f()) {
  // 略...
  _scheduleAsyncCallback(f);
}

具体逻辑如下:通过 callback 创建 _AsyncCallbackEntry 对象,并加入到链表队列中。注意一点,当微任务加入到队列中时,此时不在执行微任务循环,会通过 _AsyncRun._scheduleImmediate_startMicrotaskLoop 函数安排在事件队列之前。

void _scheduleAsyncCallback(_AsyncCallback callback) {
  _AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
  _AsyncCallbackEntry? lastCallback = _lastCallback;
  if (lastCallback == null) {
    _nextCallback = _lastCallback = newEntry;
    if (!_isInCallbackLoop) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  } else {
    lastCallback.next = newEntry;
    _lastCallback = newEntry;
  }
}

class _AsyncRun {
  /// Schedule the given callback before any other event in the event-loop.
  external static void _scheduleImmediate(void Function() callback);
}

5. 为什么微任务的处理的优先级较高

总的来看,微任务的处理是比较简单的,它和 Timer 有着本质的区别,并不会使用 发送/接收端口,也不通过 C++ 的消息机制来处理任务。所以微任务的处理更加小巧,灵活。由于 main 函数执行完后,接下来会通过微任务循环处理 main 中的微任务列表,这就是 微任务 先于 零延迟 Timer 的本质原因。

现在回到上一篇中,就很容易理解永远不会被回调的 零延迟 Timer 。 由于 scheduleMicrotask 不断添加微任务,所以 _microtaskLoop 会一直执行,无法出栈。

main() {
  Timer.run(() { print("executed"); });  // Will never be executed.
  foo() {
    scheduleMicrotask(foo);  // Schedules [foo] in front of other events.
  }
  foo();
}

其实微任务的并不是很常用,在 Flutter 框架中的使用场景也并不多。我印象中比较深的是在 手势竞技场 中,当只有一个竞技者时,通过 scheduleMicrotask, 在回调在异步触发 _resolveByDefault 方法。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

另外,记得在 Flutter 1.12 之前, scheduleWarmUpFrame 是通过 scheduleMicrotask 来执行 handleBeginFramehandleDrawFrame 的,之后都改成 Timer.run零延迟回调 了。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

scheduleMicrotask 中回调处理的优先级比较高,如果微任务队列中处理的东西比较多,可能会让其他事件的执行滞后。所以计算密集型的耗时任务是不应该使用 scheduleMicrotask 处理的,如果某段逻辑不想影响下面的代码执行,又想相对于异步任务优先处理,可以通过 scheduleMicrotask 扔进微任务队列中。


到这里,我们通过 Timer 初步认识了 Dart 消息处理的机制,通过 scheduleMicrotask 了解微任务是如何加入队列和循环执行的。流程可以简化成下图,也就是大家常说的 事件循环 Event Loop。其实这个图也不是非常准确,但基本上可以表示事件模型。

【Flutter 异步编程 -陆】 | 探索 Dart 消息处理与微任务循环

另外,这里介绍的是 Dart 知识的范畴,对于 Flutter 框架来说,Dart 中的语言特性知识地基。在main 函数执行完后,应用程序是一直运行的,这说明其中肯定存在一种 循环机制 Loop 保证程序不会退出。另外加上渲染机制、手势事件、动画机制和 Dart 的消息处理机制和微任务循环,共同构成了 Flutter事件循环机制

那本文就到这里,有了这些基础知识的铺垫,下一篇,我们将进军 Stream ,看一下其内部的实现原理。并探讨一下 Stream 流转换的相关知识。

转载自:https://juejin.cn/post/7155270458353385486
评论
请登录