likes
comments
collection
share

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

作者站长头像
站长
· 阅读数 70

一、从 Stream 的回调监听开始说起

在第三篇中,我们已经对 Stream 有了基本的概念认知,以及使用方式。 Stream 的特点是可以通过 listen 方法的第一入参 onData,不断监听 回调事件

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

上一篇研究 Timer 时,就说过:对于回调来说,有两个非常重要的线索:

回调函数去哪里了 ? 回调函数如何触发的 ?


1. 探索回调是如何触发的

下面是一段 Stream 简单使用的代码,通过 StreamController 添加元素,并通过 listen 方法监听其中的 stream 的激活事件。 了解一个回调函数触发时机的最好方式就是通过断点调试。

void main(){
  StreamController<int> controller = StreamController();
  // 监听事件
  controller.stream.listen((int event) {
    print(event); //<--- 断点处
  });
  // 添加元素
  controller.add(1);
  controller.add(2);
}

从调试中可以看出,回调的根源是由 _RawReceivePortImpl#_handleMessage 触发的,并经由 微任务循环 被执行。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

由于 _handleMessage 方法作为 Dart VM 处理消息的入口,其触发的时机并不单一。这就有个疑点:Stream 监听的回调根源的 _handleMessage 和触发 main 函数的 _handleMessage 是否是在同一时刻。这可以根据入参中的 id 来判断。

通过调试可以看出 id 值相同,就说明 main 函数执行完后,通过微任务循环触发了 Stream#listen 中的回调。从这里也能看出,使用 StreamController 添加元素,本质上就是通过 scheduleMicrotask 处理,并不会涉及 Dart VM 的通信机制。

main 函数触发源stream 接收回调 1stream 接收回调 2
_handleMessage 入参 id 值168374362735082316837436273508231683743627350823

2.探索 StreamController 与 Stream 实现类

在继续探索之前,有个比较重要的问题应该理清:listenStream 的成员方法,由于 Stream 本身是一个 抽象类,并不能直接实例化,触发 listen 方法的 Stream 对象必然是其实现类。

测试中的 Stream 对象是通过 StreamController 获取的,而 StreamController 本身也是抽象类,所以在运行时也必然有其实现类。认清这两个实现类是什么,我们才能知道 Stream#listenStreamController#add 具体做了什么事:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

abstract class StreamController<T> implements StreamSink<T> {

  Stream<T> get stream;
  
  //略...
}

由于 StreamController 是宿主,先从它的实例化来看。在 factory 普通构造中,默认情况下会返回的是 _AsyncStreamController 类对象。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

_AsyncStreamController 是一个比较有意思的类,如下所示,在定义上通过 = 号连接。

@pragma("vm:entry-point")
class _AsyncStreamController<T> = _StreamController<T> with _AsyncStreamControllerDispatch<T>;

在定义类时, A with B 可以在形式上通过 = 赋值给另一个类名 C。 这种情况下 C 可以视为 AB 的子类型,测试如下:

class C = A with B;

class A{
  final String name;
  A(this.name);
}

class B{
  printInfo(){
    print('a');
  }
}

void main(){
  C c =  C('hello');
  c.printInfo();
  print(c is A); // A
  print(c is B); // B
}

本质上来说,就是一个简写的语法糖而已,并没有什么特别的地方,看到时了解即可,不知道这个语法点,猛的一看确实有点懵。

class C extends A with B{
  C(super.name);
}

StreamController 一族如下所示,stream 作为一个抽象的 get 方法,在其子类型中必然会实现,根据这条线索就能知道 Stream 的创建场合和具体类型:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

如下,在 _StreamController 类中,对 get stream 进行实现,返回 _ControllerStream 对象,并将控制器本身作为入参传入构造方法。

---->[_StreamController]----
 // Return a new stream every time. The streams are equal, but not identical.
 Stream<T> get stream => _ControllerStream<T>(this);

这样一来,谜题就揭晓了:在运行时,StreamController 默认情况下运行时类型为 _AsyncStreamController;其中获取的 stream 对象运行时类型为 _ControllerStream 。有了这个前置知识,接下来就会比较轻松。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


3. 追寻 onData 回调函数的 "流浪"

刚才已经简单地 反向 了解 listen#onData 回调触发的时机,接下来 正向 看一下 listen 中的 onData 回调在源码中是如何传递,在逻辑间 "流浪" 的,它的最终归宿又在哪里。测试中的 Stream 对象真实类型为 _ControllerStream,在这支的派生过程在,一定会存在 listen 方法的具体实现:

---->[Stream#listen 抽象方法]----
StreamSubscription<T> listen(void onData(T event)?,
    {Function? onError, void onDone()?, bool? cancelOnError});

如下,在 _StreamImpl 类中实现了 listen 方法。其中第一个入参 onData 就是我们追寻的目标。在 471 行 可以看到 onData 旅途的第一站是 _createSubscription 方法,它作为第一入参。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


当前对象是 _ControllerStream ,所以接下来优先进入该类 _createSubscription 方法:如下所示,onData 旅途的第二站将作为第一入参,进入 _controller#_subscribe 方法。

回忆一下,这个 _controller 对象是什么?在创建 _ControllerStream 时构造方法中传入的 this ,就是那个运行时类型为 _AsyncStreamController 的流控制器。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


_StreamController 中实现了 _subscribe 方法,在 679 行onData 回调将作为第二入参传入 _ControllerSubscription 构造,这是旅途的第四站。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


接下来,进入 _ControllerSubscription 构造,可以看到它继承自 _BufferingStreamSubscription ,而 onData 作为父类构造的第一参,这是旅途的第五站。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

_BufferingStreamSubscription 的构造中,onData 将被注册到当前的 zone 中。_registerDataHandler 的返回值是注册的函数本身。所以下面的 108 行就是 listenonData 回调函数的最终归宿。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


到这里,与监听相关类型就都浮出水面了。 这个案例中的 Stream#listen 入参 onData 最终会被 _ControllerSubscription 持有,而该对象也是 Stream#listen 方法的返回值。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


二、 StreamController 添加元素流程

从实际表现上来看,StreamController 通过 add 方法添加元素后,会触发 Stream#listen 中的 onData 回调。下面将从源码的角度来分析这里过程:

---->[StreamController#add]----
void add(T event);

1.探索 add 方法的实现

前面已经知道,测试中的 StreamController 实际类型是 _AsyncStreamController, 所以这一支上一定存在 派生类实现 add 抽象方法。该方法在 _StreamController 类中实现,在这里会判断控制器的状态,如果已经关闭,则会抛出异常。通过 _add 方法处理添加事件的核心逻辑:

---->[_StreamController#add]----
/// Send or enqueue a data event.
void add(T value) {
  if (!_mayAddEvent) throw _badEventState();
  _add(value);
}

hasListener 表示是否有监听者,如果控制器中的 stream 对象触发 listen 方法,控制器会触发 _subscribe 方法进行订阅,这是刚才追寻 onData 时所知道的。其中会关系 _state 的值,处于订阅状态。这时 _add 中会触发 _sendData 方法。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

void _add(T value) {
  if (hasListener) {
    _sendData(value);
  } else if (_isInitialState) {
    _ensurePendingEvents().add(_DelayedData<T>(value));
  }
}

_sendData 是定义在 _EventDispatch 接口中的抽象方法,但在 _StreamController 中并没有实现 _sendData 方法,那实现逻辑在哪呢 ?

abstract class _EventDispatch<T> {
  void _sendData(T data);
  void _sendError(Object error, StackTrace stackTrace);
  void _sendDone();
}

_StreamController 本身也是抽象类,不实现接口中的抽象方法也正常。冤有头,债有主,抽象方法是不会凭空出现的,代码中一定存在实现逻辑。所以现在嫌疑人只有一个: _AsyncStreamController ,查看一些它的声明可以发现,其混入了 _AsyncStreamControllerDispatch , 毫无疑问 _sendData 一定在其中实现:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

如下所示,_AsyncStreamControllerDispatch 类中确实实现了 _sendData ,它实现 _StreamController 类,调用 _subscription_addPending 方法,使用元素构造 _DelayedData 对象来添加。

abstract class _AsyncStreamControllerDispatch<T>
    implements _StreamController<T> {
  void _sendData(T data) {
    _subscription._addPending(_DelayedData<T>(data));
  }

  void _sendError(Object error, StackTrace stackTrace) {
    _subscription._addPending(_DelayedError(error, stackTrace));
  }

  void _sendDone() {
    _subscription._addPending(const _DelayedDone());
  }
}

2. 探索 _subscription#_addPending

现在问题来了,_subscription 是什么呢?既然 _AsyncStreamControllerDispatch 实现 了 _StreamController 类, _subscription 自然是来自 _StreamController 。 如下所示,返回值类型是 _ControllerSubscription ,也就是 Stream#listen 的返回值类型。 在不是添加流时 ( addStream 方法添加流元素),主要逻辑是返回 _varData 成员。

---->[_StreamController#_subscription]----
_ControllerSubscription<T> get _subscription {
  assert(hasListener);
  Object? varData = _varData;
  if (_isAddingStream) {
    _StreamControllerAddStreamState<Object?> streamState = varData as dynamic;
    varData = streamState.varData;
  }
  return varData as dynamic;
}

如下所示,在 _subscribe 时,_varData 会被赋值,从这里可以看出这里的 _subscription 就是 Stream#listen 时返回的那个 _ControllerSubscription,也就是 onData 归宿之地。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


接下来,会走到 _BufferingStreamSubscription_addPending 方法。其中会通过 _PendingEvents 维护链表结构,元素节点是 _DelayedEvent 对象,也就是 add 方法中传入元素的包装类。在条件满足时,会触发 schedule 方法:

---->[_BufferingStreamSubscription#_addPending]---
void _addPending(_DelayedEvent event) {
  var pending = _pending ??= _PendingEvents<T>();
  pending.add(event);
  if (!_hasPending) {
    _state |= _STATE_HAS_PENDING;
    if (!_isPaused) {
      pending.schedule(this);
    }
  }
}

_PendingEvents#schedule 中,会通过 scheduleMicrotask 将蓝色选中区的函数对象加入到微任务队列中。当微任务循环触发时,蓝框中将会被回调。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


3. 微任务循环与 onData 回调触发

前面说过 main 函数执行完后,接着会通过 _runPendingImmediateCallback 方法执行 微任务循环。这就是为什么会触发 _PendingEvents#handleNext 的原因。从调试也可以看出该方法是 onData 回调的 前因 ,从方法栈来看,中间还有三个方法,下面简单看一下:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


_PendingEvents#handleNext 方法的逻辑是让链表的首元素脱链,并触发首元素的 perform方法,这里的链表节点的类型为 _DelayedEvent

---->[_PendingEvents#handleNext]----
void handleNext(_EventDispatch<T> dispatch) {
  _DelayedEvent event = firstPendingEvent!;
  _DelayedEvent? nextEvent = event.next;
  firstPendingEvent = nextEvent;
  if (nextEvent == null) {
    lastPendingEvent = null;
  }
  event.perform(dispatch);
}

_DelayedEvent 其实可就是对 传入元素 的简单封装,调用 perfrom 时,通过 dispatch_sendData 来发送元素。此时可以暂停一下,想想这里的 _EventDispatch 类型的 dispatch 对象是什么?

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

稍微往上翻一点,可以看到 schedule 方法的入参是 _BufferingStreamSubscription 类型对象,从调试中也能看出接下来调用的是 _BufferingStreamSubscription#_sendData :

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


其中会通过 _zone.runUnaryGuarded ,将 data 作为入参触发 _onData 方法。三秒钟反应时间,这个 _onData 是什么?

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

---->[_BufferingStreamSubscription#_sendData]----
void _sendData(T data) {
  bool wasInputPaused = _isInputPaused;
  _state |= _STATE_IN_CALLBACK;
  _zone.runUnaryGuarded(_onData, data);
  _state &= ~_STATE_IN_CALLBACK;
  _checkState(wasInputPaused);
}

到这里,Stream#listen 监听的回调事件,以及回调的触发流程就贯通了。大家可以细细品味一下,想想这个流程。


三、同步流控制器 _SyncStreamController

Future 一样, Stream 也只是一种 监听-通知 的封装体,只不过可以连续监听而已。就像一块生铁,打造成一把刀还是一把剑,并不能由铁本身决定。Stream 只是一个工具,如何使用它由使用者决定。在 StreamController 的构造中,提供了 sync 参数来生成 同步流控制器 。如下所示,对应的是 _SyncStreamController 类。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


1. 理解同步 Future 对象

可能很多人都把 FutureStream 与异步对等了,其实 FutureStream 都有同步处理的能力。只不过很少使用而已,这里来介绍一下: 我们知道 Future 构造中传入的回调函数,将会使用 Timer.run 异步执行,从而在 main 函数之后回调,达到异步效果。如下所示 do in Futuredo in main 之后:

void main(){
  Future((){
    print('=======do in Future=========');
  });
  print('=======do in main=========');
}

---->[打印]----
=======do in main=========
=======do in Future=========

其实 Future 还有个 sync 构造,用于同步处理回调。也就是先执行 sync 这的回调,才能继续执行接下来的方法。如下所示, do in maindo in Future 之后:

void main(){
  Future.sync((){
    print('=======do in Future=========');
  });
  print('=======do in main=========');
}

---->[打印]----
=======do in Future=========
=======do in main=========

从源码在也很容易理解,其中没有使用 Timer ,而是立即触发回调,所以 sync 中传入的函数仍是在 main 函数栈中触发的。

可能有人心里会纳闷,并且发出亘古不变的灵魂拷问: 那它的存在有什么意义呢?

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


sync 构造中可以看出它会对入参回调进行异常抓取,抓到异常后,310 行 会创建一个异步的 Future 返回。如下所示,通过 sync 执行 parser 方法,其中会抛出异常,这是通过 catchError 可以进行捕捉。从日志中可以看出,异常的捕捉信息在 do in main 之后。

从这里可以看出 sync 方法应用的场景是:希望某个方法的异常处理逻辑在稍后执行,优先处理接下来的逻辑。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

void main() {
  Future.sync(parser).catchError((e){
    print(e);
  });
  print('=======do in main=========');
}

void parser() {
  int a = int.parse('a');
  print('=======do in Future=========');
}

2. 理解同步 Stream 对象

Stream 也是同理,默认 StreamController 是异步添加元素,从上面的分源码析中,也知道本质上是通过 scheduleMicrotask 实现的微任务。这样就能保证 add 方法添加元素之后,不会立刻触发 stream#listenonData 回调,而是在 main 函数执行完后,在微任务循环中触发。从而保证异步性:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

void main(){

  StreamController<int> controller = StreamController();
  controller.stream.listen((int event) {
    print('=======do in stream===$event======');
  });

  controller.add(1);
  print('=======do in main=========');
  controller.add(2);
}

StreamControllersynctrue ,就表示希望以同步的方式处理流的通知。比如 25 行 添加元素之后,会立即触发 listen 方法中的回调。这就同步 Stream 事件的同步触发:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


3. 理解同步流控制器的实现

_SyncStreamController也是继承自 _StreamController ,所以 事件订阅 的流程是一致的,只不过它混入的是 _SyncStreamControllerDispatch,以同步的方式对元素进行分发。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


通过源码可以看得很清楚, _SyncStreamControllerDispatch 会直接调用 _subscription._add 分发,触发 _subscription_onData 回调。所以就没有 scheduleMicrotask 什么事了,直接通知 stream#listen 的监听回调。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

_SyncStreamController 的应用场景是:希望在添加元素后,Stream 的监听可以立即响应。add 之后的逻辑需要等待 Stream 监听的回调处理完毕。从表现上来看就是上面测试中 do in stream 1do in main 之前触发。


四、广播流控制器

StreamController 默认构造生成的控制器,不能对 stream 进行多次监听,否则会发生异常。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

如果需要在多个场所监听一个 StreamController 的流,需要使用广播流控制器:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


1. 思考:多次监听异常的原因

测试代码如下,tag2 处对 _AsyncBroadcastStreamController#stream 进行第二次监听将发生异常:

void main(){
  StreamController<int> controller = StreamController();
  controller.stream.listen((int event) {
    print('=======do in stream 1===$event======');
  });

  controller.stream.listen((int event) { // tag2
    print('=======do in stream 2===$event======');
  });

  controller.add(1);
  print('=======do in main=========');
  controller.add(2);
}

从源码来看,这是由于 _StreamController#_subscribe 时,状态非 _isInitialState 。可能有人会疑惑,为什么不能支持多次监听呢?

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

了解 _StreamController 的源码实现,这个问题其实很好解答。因为 _StreamController 只维护了一个 _varData 用于记录 _subscription ,它本身没有能力通知多个订阅者触发回调。


2. 实现控制器流的多监听

想要支持多次监听 流控制器 ,可以通过 StreamController.broadcast 构造,创建广播流。 如下所示,这样就每次 add 元素时,就会通知所有的监听者。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

void main(){
  StreamController<int> controller = StreamController.broadcast();
  
  controller.stream.listen((int event) {
    print('=======do in stream#1===$event======');
  });

  controller.stream.listen((int event) {
    print('=======do in stream#2===$event======');
  });

  controller.add(1);
  print('=======do in main=========');
  controller.add(2);
}

从构造方法在可以看出,broadcast 也是一个工厂构造,其中也可以通过 sync 入参控制是同步还是异步。默认返回的是异步的广播流控制器 _AsyncBroadcastStreamController :


3. 为什么广播流可以多次监听

广播流 的使用非常简单,但为什么广播流可以多次监听呢,难道它叫广播,就能多次监听吗? 前面说 _StreamController 中只能记录一个 _subscription 订阅对象,想要通知多个订阅者,那必然是维护了 多个 订阅对象,在添加事件时,依次分发。

从源码中可以看出,_BroadcastStreamController 在持有首尾两个 _BroadcastSubscription 节点:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

_BroadcastSubscription 类是一个双向链表结构,会记录 _next_previous 节点:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


_BroadcastStreamController#_subscription 中会通过 _addListener 分发将生成的 subscription 对象加入到订阅者链表中:

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

---->[_AsyncBroadcastStreamController#_addListener]----
void _addListener(_BroadcastSubscription<T> subscription) {
  subscription._eventState = (_state & _STATE_EVENT_ID);
  _BroadcastSubscription<T>? oldLast = _lastSubscription;
  _lastSubscription = subscription;
  subscription._next = null;
  subscription._previous = oldLast;
  if (oldLast == null) {
    _firstSubscription = subscription;
  } else {
    oldLast._next = subscription;
  }
}

_sendData 时,会遍历 subscription 链表,依次通过 _addPending 将事件元素加入到微任务队列中。当微任务循环启动时,会依次通知监听者。这就是 _BroadcastStreamController 支持多个订阅者的原理。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


结语

上面是 Stream 机制的核心,主要把握住 监听与发送通知 的流程。另外 Stream 中还有一些小知识点,理解起来就很容易。比如 Stream.periodic 可以发送定时的循环任务,本质上是由 _SyncStreamControllerTimer.periodic 实现的,由定时器驱动事件,控制器添加元素。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现


Stream.fromFutures 内部通过 _ASyncStreamController 进行控制,遍历 futures 列表,每当任务结束时,触发 onValue 事件,让流控制器添加元素,进行通知。

【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现

还有很多小的构造这里就不一一去看了,都大同小异,理解 Stream 机制的流程是最重要的。另外关于 Stream 还有一个比较重要的知识点: Stream 的转换。这个严格意义上来说,并不算是异步的知识,本文也比较长,暂时就不介绍,以后有机会单独写一篇介绍。

到这里 FutureStream 两个关于异步处理最重要的两个对象,我们已经通过源码有了比较深入的了解。下一篇我们将认识一下如何使用 isolate 处理计算密集型的耗时任务,敬请期待 ~

转载自:https://juejin.cn/post/7160461343781306375
评论
请登录