【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现
一、从 Stream 的回调监听开始说起
在第三篇中,我们已经对 Stream 有了基本的概念认知,以及使用方式。 Stream 的特点是可以通过 listen 方法的第一入参 onData,不断监听 回调事件 。


上一篇研究 Timer 时,就说过:对于回调来说,有两个非常重要的线索:
回调函数去哪里了 ? 回调函数如何触发的 ?
1. 探索回调是如何触发的
下面是一段 Stream 简单使用的代码,通过 StreamController 添加元素,并通过 listen 方法监听其中的 stream 的激活事件。 了解一个回调函数触发时机的最好方式就是通过断点调试。
void main(){
StreamController<int> controller = StreamController();
// 监听事件
controller.stream.listen((int event) {
print(event); //<--- 断点处
});
// 添加元素
controller.add(1);
controller.add(2);
}
从调试中可以看出,回调的根源是由 _RawReceivePortImpl#_handleMessage 触发的,并经由 微任务循环 被执行。


由于 _handleMessage 方法作为 Dart VM 处理消息的入口,其触发的时机并不单一。这就有个疑点:Stream 监听的回调根源的 _handleMessage 和触发 main 函数的 _handleMessage 是否是在同一时刻。这可以根据入参中的 id 来判断。
通过调试可以看出 id 值相同,就说明 main 函数执行完后,通过微任务循环触发了 Stream#listen 中的回调。从这里也能看出,使用 StreamController 添加元素,本质上就是通过 scheduleMicrotask 处理,并不会涉及 Dart VM 的通信机制。
| main 函数触发源 | stream 接收回调 1 | stream 接收回调 2 | |
|---|---|---|---|
_handleMessage 入参 id 值 | 1683743627350823 | 1683743627350823 | 1683743627350823 |
2.探索 StreamController 与 Stream 实现类
在继续探索之前,有个比较重要的问题应该理清:listen 是 Stream 的成员方法,由于 Stream 本身是一个 抽象类,并不能直接实例化,触发 listen 方法的 Stream 对象必然是其实现类。
测试中的 Stream 对象是通过 StreamController 获取的,而 StreamController 本身也是抽象类,所以在运行时也必然有其实现类。认清这两个实现类是什么,我们才能知道 Stream#listen 和 StreamController#add 具体做了什么事:

abstract class StreamController<T> implements StreamSink<T> {
Stream<T> get stream;
//略...
}
由于 StreamController 是宿主,先从它的实例化来看。在 factory 普通构造中,默认情况下会返回的是 _AsyncStreamController 类对象。

_AsyncStreamController 是一个比较有意思的类,如下所示,在定义上通过 = 号连接。
@pragma("vm:entry-point")
class _AsyncStreamController<T> = _StreamController<T> with _AsyncStreamControllerDispatch<T>;
在定义类时, A with B 可以在形式上通过 = 赋值给另一个类名 C。 这种情况下 C 可以视为 A 和 B 的子类型,测试如下:
class C = A with B;
class A{
final String name;
A(this.name);
}
class B{
printInfo(){
print('a');
}
}
void main(){
C c = C('hello');
c.printInfo();
print(c is A); // A
print(c is B); // B
}
本质上来说,就是一个简写的语法糖而已,并没有什么特别的地方,看到时了解即可,不知道这个语法点,猛的一看确实有点懵。
class C extends A with B{
C(super.name);
}
StreamController 一族如下所示,stream 作为一个抽象的 get 方法,在其子类型中必然会实现,根据这条线索就能知道 Stream 的创建场合和具体类型:

如下,在 _StreamController 类中,对 get stream 进行实现,返回 _ControllerStream 对象,并将控制器本身作为入参传入构造方法。
---->[_StreamController]----
// Return a new stream every time. The streams are equal, but not identical.
Stream<T> get stream => _ControllerStream<T>(this);
这样一来,谜题就揭晓了:在运行时,StreamController 默认情况下运行时类型为 _AsyncStreamController;其中获取的 stream 对象运行时类型为 _ControllerStream 。有了这个前置知识,接下来就会比较轻松。

3. 追寻 onData 回调函数的 "流浪"
刚才已经简单地 反向 了解 listen#onData 回调触发的时机,接下来 正向 看一下 listen 中的 onData 回调在源码中是如何传递,在逻辑间 "流浪" 的,它的最终归宿又在哪里。测试中的 Stream 对象真实类型为 _ControllerStream,在这支的派生过程在,一定会存在 listen 方法的具体实现:
---->[Stream#listen 抽象方法]----
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
如下,在 _StreamImpl 类中实现了 listen 方法。其中第一个入参 onData 就是我们追寻的目标。在 471 行 可以看到 onData 旅途的第一站是 _createSubscription 方法,它作为第一入参。

当前对象是 _ControllerStream ,所以接下来优先进入该类 _createSubscription 方法:如下所示,onData 旅途的第二站将作为第一入参,进入 _controller#_subscribe 方法。
回忆一下,这个 _controller 对象是什么?在创建 _ControllerStream 时构造方法中传入的 this ,就是那个运行时类型为 _AsyncStreamController 的流控制器。

在 _StreamController 中实现了 _subscribe 方法,在 679 行 中 onData 回调将作为第二入参传入 _ControllerSubscription 构造,这是旅途的第四站。

接下来,进入 _ControllerSubscription 构造,可以看到它继承自 _BufferingStreamSubscription ,而 onData 作为父类构造的第一参,这是旅途的第五站。

在 _BufferingStreamSubscription 的构造中,onData 将被注册到当前的 zone 中。_registerDataHandler 的返回值是注册的函数本身。所以下面的 108 行就是 listen 中 onData 回调函数的最终归宿。

到这里,与监听相关类型就都浮出水面了。 这个案例中的 Stream#listen 入参 onData 最终会被 _ControllerSubscription 持有,而该对象也是 Stream#listen 方法的返回值。

二、 StreamController 添加元素流程
从实际表现上来看,StreamController 通过 add 方法添加元素后,会触发 Stream#listen 中的 onData 回调。下面将从源码的角度来分析这里过程:
---->[StreamController#add]----
void add(T event);
1.探索 add 方法的实现
前面已经知道,测试中的 StreamController 实际类型是 _AsyncStreamController, 所以这一支上一定存在 派生类实现 add 抽象方法。该方法在 _StreamController 类中实现,在这里会判断控制器的状态,如果已经关闭,则会抛出异常。通过 _add 方法处理添加事件的核心逻辑:
---->[_StreamController#add]----
/// Send or enqueue a data event.
void add(T value) {
if (!_mayAddEvent) throw _badEventState();
_add(value);
}
hasListener 表示是否有监听者,如果控制器中的 stream 对象触发 listen 方法,控制器会触发 _subscribe 方法进行订阅,这是刚才追寻 onData 时所知道的。其中会关系 _state 的值,处于订阅状态。这时 _add 中会触发 _sendData 方法。

void _add(T value) {
if (hasListener) {
_sendData(value);
} else if (_isInitialState) {
_ensurePendingEvents().add(_DelayedData<T>(value));
}
}
_sendData 是定义在 _EventDispatch 接口中的抽象方法,但在 _StreamController 中并没有实现 _sendData 方法,那实现逻辑在哪呢 ?
abstract class _EventDispatch<T> {
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
_StreamController 本身也是抽象类,不实现接口中的抽象方法也正常。冤有头,债有主,抽象方法是不会凭空出现的,代码中一定存在实现逻辑。所以现在嫌疑人只有一个: _AsyncStreamController ,查看一些它的声明可以发现,其混入了 _AsyncStreamControllerDispatch , 毫无疑问 _sendData 一定在其中实现:

如下所示,_AsyncStreamControllerDispatch 类中确实实现了 _sendData ,它实现 _StreamController 类,调用 _subscription 的 _addPending 方法,使用元素构造 _DelayedData 对象来添加。
abstract class _AsyncStreamControllerDispatch<T>
implements _StreamController<T> {
void _sendData(T data) {
_subscription._addPending(_DelayedData<T>(data));
}
void _sendError(Object error, StackTrace stackTrace) {
_subscription._addPending(_DelayedError(error, stackTrace));
}
void _sendDone() {
_subscription._addPending(const _DelayedDone());
}
}
2. 探索 _subscription#_addPending
现在问题来了,_subscription 是什么呢?既然 _AsyncStreamControllerDispatch 实现 了 _StreamController 类, _subscription 自然是来自 _StreamController 。 如下所示,返回值类型是 _ControllerSubscription ,也就是 Stream#listen 的返回值类型。 在不是添加流时 ( addStream 方法添加流元素),主要逻辑是返回 _varData 成员。
---->[_StreamController#_subscription]----
_ControllerSubscription<T> get _subscription {
assert(hasListener);
Object? varData = _varData;
if (_isAddingStream) {
_StreamControllerAddStreamState<Object?> streamState = varData as dynamic;
varData = streamState.varData;
}
return varData as dynamic;
}
如下所示,在 _subscribe 时,_varData 会被赋值,从这里可以看出这里的 _subscription 就是 Stream#listen 时返回的那个 _ControllerSubscription,也就是 onData 归宿之地。

接下来,会走到 _BufferingStreamSubscription 的 _addPending 方法。其中会通过 _PendingEvents 维护链表结构,元素节点是 _DelayedEvent 对象,也就是 add 方法中传入元素的包装类。在条件满足时,会触发 schedule 方法:
---->[_BufferingStreamSubscription#_addPending]---
void _addPending(_DelayedEvent event) {
var pending = _pending ??= _PendingEvents<T>();
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
if (!_isPaused) {
pending.schedule(this);
}
}
}
在 _PendingEvents#schedule 中,会通过 scheduleMicrotask 将蓝色选中区的函数对象加入到微任务队列中。当微任务循环触发时,蓝框中将会被回调。

3. 微任务循环与 onData 回调触发
前面说过 main 函数执行完后,接着会通过 _runPendingImmediateCallback 方法执行 微任务循环。这就是为什么会触发 _PendingEvents#handleNext 的原因。从调试也可以看出该方法是 onData 回调的 前因 ,从方法栈来看,中间还有三个方法,下面简单看一下:

_PendingEvents#handleNext 方法的逻辑是让链表的首元素脱链,并触发首元素的 perform方法,这里的链表节点的类型为 _DelayedEvent。
---->[_PendingEvents#handleNext]----
void handleNext(_EventDispatch<T> dispatch) {
_DelayedEvent event = firstPendingEvent!;
_DelayedEvent? nextEvent = event.next;
firstPendingEvent = nextEvent;
if (nextEvent == null) {
lastPendingEvent = null;
}
event.perform(dispatch);
}
_DelayedEvent 其实可就是对 传入元素 的简单封装,调用 perfrom 时,通过 dispatch 的 _sendData 来发送元素。此时可以暂停一下,想想这里的 _EventDispatch 类型的 dispatch 对象是什么?

稍微往上翻一点,可以看到 schedule 方法的入参是 _BufferingStreamSubscription 类型对象,从调试中也能看出接下来调用的是 _BufferingStreamSubscription#_sendData :

其中会通过 _zone.runUnaryGuarded ,将 data 作为入参触发 _onData 方法。三秒钟反应时间,这个 _onData 是什么?

---->[_BufferingStreamSubscription#_sendData]----
void _sendData(T data) {
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
到这里,Stream#listen 监听的回调事件,以及回调的触发流程就贯通了。大家可以细细品味一下,想想这个流程。
三、同步流控制器 _SyncStreamController
和 Future 一样, Stream 也只是一种 监听-通知 的封装体,只不过可以连续监听而已。就像一块生铁,打造成一把刀还是一把剑,并不能由铁本身决定。Stream 只是一个工具,如何使用它由使用者决定。在 StreamController 的构造中,提供了 sync 参数来生成 同步流控制器 。如下所示,对应的是 _SyncStreamController 类。

1. 理解同步 Future 对象
可能很多人都把 Future 和 Stream 与异步对等了,其实 Future 和 Stream 都有同步处理的能力。只不过很少使用而已,这里来介绍一下:
我们知道 Future 构造中传入的回调函数,将会使用 Timer.run 异步执行,从而在 main 函数之后回调,达到异步效果。如下所示 do in Future 在 do in main 之后:
void main(){
Future((){
print('=======do in Future=========');
});
print('=======do in main=========');
}
---->[打印]----
=======do in main=========
=======do in Future=========
其实 Future 还有个 sync 构造,用于同步处理回调。也就是先执行 sync 这的回调,才能继续执行接下来的方法。如下所示, do in main 在 do in Future 之后:
void main(){
Future.sync((){
print('=======do in Future=========');
});
print('=======do in main=========');
}
---->[打印]----
=======do in Future=========
=======do in main=========
从源码在也很容易理解,其中没有使用 Timer ,而是立即触发回调,所以 sync 中传入的函数仍是在 main 函数栈中触发的。
可能有人心里会纳闷,并且发出亘古不变的灵魂拷问:
那它的存在有什么意义呢?

从 sync 构造中可以看出它会对入参回调进行异常抓取,抓到异常后,310 行 会创建一个异步的 Future 返回。如下所示,通过 sync 执行 parser 方法,其中会抛出异常,这是通过 catchError 可以进行捕捉。从日志中可以看出,异常的捕捉信息在 do in main 之后。
从这里可以看出 sync 方法应用的场景是:希望某个方法的异常处理逻辑在稍后执行,优先处理接下来的逻辑。

void main() {
Future.sync(parser).catchError((e){
print(e);
});
print('=======do in main=========');
}
void parser() {
int a = int.parse('a');
print('=======do in Future=========');
}
2. 理解同步 Stream 对象
Stream 也是同理,默认 StreamController 是异步添加元素,从上面的分源码析中,也知道本质上是通过 scheduleMicrotask 实现的微任务。这样就能保证 add 方法添加元素之后,不会立刻触发 stream#listen的 onData 回调,而是在 main 函数执行完后,在微任务循环中触发。从而保证异步性:

void main(){
StreamController<int> controller = StreamController();
controller.stream.listen((int event) {
print('=======do in stream===$event======');
});
controller.add(1);
print('=======do in main=========');
controller.add(2);
}
当 StreamController 的 sync 为 true ,就表示希望以同步的方式处理流的通知。比如 25 行 添加元素之后,会立即触发 listen 方法中的回调。这就同步 Stream 事件的同步触发:

3. 理解同步流控制器的实现
_SyncStreamController也是继承自 _StreamController ,所以 事件订阅 的流程是一致的,只不过它混入的是 _SyncStreamControllerDispatch,以同步的方式对元素进行分发。

通过源码可以看得很清楚, _SyncStreamControllerDispatch 会直接调用 _subscription._add 分发,触发 _subscription 的 _onData 回调。所以就没有 scheduleMicrotask 什么事了,直接通知 stream#listen 的监听回调。

_SyncStreamController 的应用场景是:希望在添加元素后,Stream 的监听可以立即响应。add 之后的逻辑需要等待 Stream 监听的回调处理完毕。从表现上来看就是上面测试中 do in stream 1 在 do in main 之前触发。
四、广播流控制器
StreamController 默认构造生成的控制器,不能对 stream 进行多次监听,否则会发生异常。

如果需要在多个场所监听一个 StreamController 的流,需要使用广播流控制器:

1. 思考:多次监听异常的原因
测试代码如下,tag2 处对 _AsyncBroadcastStreamController#stream 进行第二次监听将发生异常:
void main(){
StreamController<int> controller = StreamController();
controller.stream.listen((int event) {
print('=======do in stream 1===$event======');
});
controller.stream.listen((int event) { // tag2
print('=======do in stream 2===$event======');
});
controller.add(1);
print('=======do in main=========');
controller.add(2);
}
从源码来看,这是由于 _StreamController#_subscribe 时,状态非 _isInitialState 。可能有人会疑惑,为什么不能支持多次监听呢?

了解 _StreamController 的源码实现,这个问题其实很好解答。因为 _StreamController 只维护了一个 _varData 用于记录 _subscription ,它本身没有能力通知多个订阅者触发回调。
2. 实现控制器流的多监听
想要支持多次监听 流控制器 ,可以通过 StreamController.broadcast 构造,创建广播流。 如下所示,这样就每次 add 元素时,就会通知所有的监听者。

void main(){
StreamController<int> controller = StreamController.broadcast();
controller.stream.listen((int event) {
print('=======do in stream#1===$event======');
});
controller.stream.listen((int event) {
print('=======do in stream#2===$event======');
});
controller.add(1);
print('=======do in main=========');
controller.add(2);
}
从构造方法在可以看出,broadcast 也是一个工厂构造,其中也可以通过 sync 入参控制是同步还是异步。默认返回的是异步的广播流控制器 _AsyncBroadcastStreamController :
3. 为什么广播流可以多次监听
广播流 的使用非常简单,但为什么广播流可以多次监听呢,难道它叫广播,就能多次监听吗? 前面说 _StreamController 中只能记录一个 _subscription 订阅对象,想要通知多个订阅者,那必然是维护了 多个 订阅对象,在添加事件时,依次分发。
从源码中可以看出,_BroadcastStreamController 在持有首尾两个 _BroadcastSubscription 节点:

而 _BroadcastSubscription 类是一个双向链表结构,会记录 _next 和 _previous 节点:

在 _BroadcastStreamController#_subscription 中会通过 _addListener 分发将生成的 subscription 对象加入到订阅者链表中:

---->[_AsyncBroadcastStreamController#_addListener]----
void _addListener(_BroadcastSubscription<T> subscription) {
subscription._eventState = (_state & _STATE_EVENT_ID);
_BroadcastSubscription<T>? oldLast = _lastSubscription;
_lastSubscription = subscription;
subscription._next = null;
subscription._previous = oldLast;
if (oldLast == null) {
_firstSubscription = subscription;
} else {
oldLast._next = subscription;
}
}
在 _sendData 时,会遍历 subscription 链表,依次通过 _addPending 将事件元素加入到微任务队列中。当微任务循环启动时,会依次通知监听者。这就是 _BroadcastStreamController 支持多个订阅者的原理。

结语
上面是 Stream 机制的核心,主要把握住 监听与发送通知 的流程。另外 Stream 中还有一些小知识点,理解起来就很容易。比如 Stream.periodic 可以发送定时的循环任务,本质上是由 _SyncStreamController 和 Timer.periodic 实现的,由定时器驱动事件,控制器添加元素。

Stream.fromFutures 内部通过 _ASyncStreamController 进行控制,遍历 futures 列表,每当任务结束时,触发 onValue 事件,让流控制器添加元素,进行通知。

还有很多小的构造这里就不一一去看了,都大同小异,理解 Stream 机制的流程是最重要的。另外关于 Stream 还有一个比较重要的知识点: Stream 的转换。这个严格意义上来说,并不算是异步的知识,本文也比较长,暂时就不介绍,以后有机会单独写一篇介绍。
到这里 Future 和 Stream 两个关于异步处理最重要的两个对象,我们已经通过源码有了比较深入的了解。下一篇我们将认识一下如何使用 isolate 处理计算密集型的耗时任务,敬请期待 ~
转载自:https://juejin.cn/post/7160461343781306375