【Flutter 异步编程 - 柒】 | 深入剖析 Stream 机制源码实现
一、从 Stream 的回调监听开始说起
在第三篇中,我们已经对 Stream
有了基本的概念认知,以及使用方式。 Stream
的特点是可以通过 listen
方法的第一入参 onData
,不断监听 回调事件
。
上一篇研究 Timer
时,就说过:对于回调来说,有两个非常重要的线索:
回调函数去哪里了 ? 回调函数如何触发的 ?
1. 探索回调是如何触发的
下面是一段 Stream
简单使用的代码,通过 StreamController
添加元素,并通过 listen
方法监听其中的 stream
的激活事件。 了解一个回调函数触发时机的最好方式就是通过断点调试。
void main(){
StreamController<int> controller = StreamController();
// 监听事件
controller.stream.listen((int event) {
print(event); //<--- 断点处
});
// 添加元素
controller.add(1);
controller.add(2);
}
从调试中可以看出,回调的根源是由 _RawReceivePortImpl#_handleMessage
触发的,并经由 微任务循环
被执行。
由于 _handleMessage
方法作为 Dart VM
处理消息的入口,其触发的时机并不单一。这就有个疑点:Stream
监听的回调根源的 _handleMessage
和触发 main
函数的 _handleMessage
是否是在同一时刻。这可以根据入参中的 id
来判断。
通过调试可以看出 id
值相同,就说明 main
函数执行完后,通过微任务循环触发了 Stream#listen
中的回调。从这里也能看出,使用 StreamController
添加元素,本质上就是通过 scheduleMicrotask
处理,并不会涉及 Dart VM
的通信机制。
main 函数触发源 | stream 接收回调 1 | stream 接收回调 2 | |
---|---|---|---|
_handleMessage 入参 id 值 | 1683743627350823 | 1683743627350823 | 1683743627350823 |
2.探索 StreamController 与 Stream 实现类
在继续探索之前,有个比较重要的问题应该理清:listen
是 Stream
的成员方法,由于 Stream
本身是一个 抽象类
,并不能直接实例化,触发 listen
方法的 Stream
对象必然是其实现类。
测试中的 Stream
对象是通过 StreamController
获取的,而 StreamController
本身也是抽象类,所以在运行时也必然有其实现类。认清这两个实现类是什么,我们才能知道 Stream#listen
和 StreamController#add
具体做了什么事:
abstract class StreamController<T> implements StreamSink<T> {
Stream<T> get stream;
//略...
}
由于 StreamController
是宿主,先从它的实例化来看。在 factory
普通构造中,默认情况下会返回的是 _AsyncStreamController
类对象。
_AsyncStreamController
是一个比较有意思的类,如下所示,在定义上通过 =
号连接。
@pragma("vm:entry-point")
class _AsyncStreamController<T> = _StreamController<T> with _AsyncStreamControllerDispatch<T>;
在定义类时, A with B
可以在形式上通过 =
赋值给另一个类名 C
。 这种情况下 C
可以视为 A
和 B
的子类型,测试如下:
class C = A with B;
class A{
final String name;
A(this.name);
}
class B{
printInfo(){
print('a');
}
}
void main(){
C c = C('hello');
c.printInfo();
print(c is A); // A
print(c is B); // B
}
本质上来说,就是一个简写的语法糖而已,并没有什么特别的地方,看到时了解即可,不知道这个语法点,猛的一看确实有点懵。
class C extends A with B{
C(super.name);
}
StreamController
一族如下所示,stream
作为一个抽象的 get
方法,在其子类型中必然会实现,根据这条线索就能知道 Stream
的创建场合和具体类型:
如下,在 _StreamController
类中,对 get stream
进行实现,返回 _ControllerStream
对象,并将控制器本身作为入参传入构造方法。
---->[_StreamController]----
// Return a new stream every time. The streams are equal, but not identical.
Stream<T> get stream => _ControllerStream<T>(this);
这样一来,谜题就揭晓了:在运行时,StreamController
默认情况下运行时类型为 _AsyncStreamController
;其中获取的 stream
对象运行时类型为 _ControllerStream
。有了这个前置知识,接下来就会比较轻松。
3. 追寻 onData 回调函数的 "流浪"
刚才已经简单地 反向
了解 listen#onData
回调触发的时机,接下来 正向
看一下 listen
中的 onData
回调在源码中是如何传递,在逻辑间 "流浪"
的,它的最终归宿又在哪里。测试中的 Stream
对象真实类型为 _ControllerStream
,在这支的派生过程在,一定会存在 listen
方法的具体实现:
---->[Stream#listen 抽象方法]----
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
如下,在 _StreamImpl
类中实现了 listen
方法。其中第一个入参 onData
就是我们追寻的目标。在 471 行
可以看到 onData
旅途的第一站是 _createSubscription
方法,它作为第一入参。
当前对象是 _ControllerStream
,所以接下来优先进入该类 _createSubscription
方法:如下所示,onData
旅途的第二站将作为第一入参,进入 _controller#_subscribe
方法。
回忆一下,这个 _controller
对象是什么?在创建 _ControllerStream
时构造方法中传入的 this
,就是那个运行时类型为 _AsyncStreamController
的流控制器。
在 _StreamController
中实现了 _subscribe
方法,在 679 行
中 onData
回调将作为第二入参传入 _ControllerSubscription
构造,这是旅途的第四站。
接下来,进入 _ControllerSubscription
构造,可以看到它继承自 _BufferingStreamSubscription
,而 onData
作为父类构造的第一参,这是旅途的第五站。
在 _BufferingStreamSubscription
的构造中,onData
将被注册到当前的 zone
中。_registerDataHandler
的返回值是注册的函数本身。所以下面的 108
行就是 listen
中 onData
回调函数的最终归宿。
到这里,与监听相关类型就都浮出水面了。 这个案例中的 Stream#listen
入参 onData
最终会被 _ControllerSubscription
持有,而该对象也是 Stream#listen
方法的返回值。
二、 StreamController 添加元素流程
从实际表现上来看,StreamController
通过 add
方法添加元素后,会触发 Stream#listen
中的 onData
回调。下面将从源码的角度来分析这里过程:
---->[StreamController#add]----
void add(T event);
1.探索 add 方法的实现
前面已经知道,测试中的 StreamController
实际类型是 _AsyncStreamController
, 所以这一支上一定存在 派生类实现 add
抽象方法。该方法在 _StreamController
类中实现,在这里会判断控制器的状态,如果已经关闭,则会抛出异常。通过 _add
方法处理添加事件的核心逻辑:
---->[_StreamController#add]----
/// Send or enqueue a data event.
void add(T value) {
if (!_mayAddEvent) throw _badEventState();
_add(value);
}
hasListener
表示是否有监听者,如果控制器中的 stream
对象触发 listen
方法,控制器会触发 _subscribe
方法进行订阅,这是刚才追寻 onData
时所知道的。其中会关系 _state
的值,处于订阅状态。这时 _add
中会触发 _sendData
方法。
void _add(T value) {
if (hasListener) {
_sendData(value);
} else if (_isInitialState) {
_ensurePendingEvents().add(_DelayedData<T>(value));
}
}
_sendData
是定义在 _EventDispatch
接口中的抽象方法,但在 _StreamController
中并没有实现 _sendData
方法,那实现逻辑在哪呢 ?
abstract class _EventDispatch<T> {
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
_StreamController
本身也是抽象类,不实现接口中的抽象方法也正常。冤有头,债有主,抽象方法是不会凭空出现的,代码中一定存在实现逻辑。所以现在嫌疑人只有一个: _AsyncStreamController
,查看一些它的声明可以发现,其混入了 _AsyncStreamControllerDispatch
, 毫无疑问 _sendData
一定在其中实现:
如下所示,_AsyncStreamControllerDispatch
类中确实实现了 _sendData
,它实现 _StreamController
类,调用 _subscription
的 _addPending
方法,使用元素构造 _DelayedData
对象来添加。
abstract class _AsyncStreamControllerDispatch<T>
implements _StreamController<T> {
void _sendData(T data) {
_subscription._addPending(_DelayedData<T>(data));
}
void _sendError(Object error, StackTrace stackTrace) {
_subscription._addPending(_DelayedError(error, stackTrace));
}
void _sendDone() {
_subscription._addPending(const _DelayedDone());
}
}
2. 探索 _subscription#_addPending
现在问题来了,_subscription
是什么呢?既然 _AsyncStreamControllerDispatch
实现 了 _StreamController
类, _subscription
自然是来自 _StreamController
。 如下所示,返回值类型是 _ControllerSubscription
,也就是 Stream#listen
的返回值类型。 在不是添加流时 ( addStream 方法添加流元素
),主要逻辑是返回 _varData
成员。
---->[_StreamController#_subscription]----
_ControllerSubscription<T> get _subscription {
assert(hasListener);
Object? varData = _varData;
if (_isAddingStream) {
_StreamControllerAddStreamState<Object?> streamState = varData as dynamic;
varData = streamState.varData;
}
return varData as dynamic;
}
如下所示,在 _subscribe
时,_varData
会被赋值,从这里可以看出这里的 _subscription
就是 Stream#listen
时返回的那个 _ControllerSubscription
,也就是 onData
归宿之地。
接下来,会走到 _BufferingStreamSubscription
的 _addPending
方法。其中会通过 _PendingEvents
维护链表结构,元素节点是 _DelayedEvent
对象,也就是 add
方法中传入元素的包装类。在条件满足时,会触发 schedule
方法:
---->[_BufferingStreamSubscription#_addPending]---
void _addPending(_DelayedEvent event) {
var pending = _pending ??= _PendingEvents<T>();
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
if (!_isPaused) {
pending.schedule(this);
}
}
}
在 _PendingEvents#schedule
中,会通过 scheduleMicrotask
将蓝色选中区的函数对象加入到微任务队列中。当微任务循环触发时,蓝框中将会被回调。
3. 微任务循环与 onData
回调触发
前面说过 main
函数执行完后,接着会通过 _runPendingImmediateCallback
方法执行 微任务循环
。这就是为什么会触发 _PendingEvents#handleNext
的原因。从调试也可以看出该方法是 onData
回调的 前因
,从方法栈来看,中间还有三个方法,下面简单看一下:
_PendingEvents#handleNext
方法的逻辑是让链表的首元素脱链,并触发首元素的 perform
方法,这里的链表节点的类型为 _DelayedEvent
。
---->[_PendingEvents#handleNext]----
void handleNext(_EventDispatch<T> dispatch) {
_DelayedEvent event = firstPendingEvent!;
_DelayedEvent? nextEvent = event.next;
firstPendingEvent = nextEvent;
if (nextEvent == null) {
lastPendingEvent = null;
}
event.perform(dispatch);
}
_DelayedEvent
其实可就是对 传入元素
的简单封装,调用 perfrom
时,通过 dispatch
的 _sendData
来发送元素。此时可以暂停一下,想想这里的 _EventDispatch
类型的 dispatch
对象是什么?
稍微往上翻一点,可以看到 schedule
方法的入参是 _BufferingStreamSubscription
类型对象,从调试中也能看出接下来调用的是 _BufferingStreamSubscription#_sendData
:
其中会通过 _zone.runUnaryGuarded
,将 data
作为入参触发 _onData
方法。三秒钟反应时间,这个 _onData
是什么?
---->[_BufferingStreamSubscription#_sendData]----
void _sendData(T data) {
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
到这里,Stream#listen
监听的回调事件,以及回调的触发流程就贯通了。大家可以细细品味一下,想想这个流程。
三、同步流控制器 _SyncStreamController
和 Future
一样, Stream
也只是一种 监听-通知
的封装体,只不过可以连续监听而已。就像一块生铁,打造成一把刀还是一把剑,并不能由铁本身决定。Stream
只是一个工具,如何使用它由使用者决定。在 StreamController
的构造中,提供了 sync
参数来生成 同步流控制器
。如下所示,对应的是 _SyncStreamController
类。
1. 理解同步 Future 对象
可能很多人都把 Future
和 Stream
与异步对等了,其实 Future
和 Stream
都有同步处理的能力。只不过很少使用而已,这里来介绍一下:
我们知道 Future
构造中传入的回调函数,将会使用 Timer.run
异步执行,从而在 main
函数之后回调,达到异步效果。如下所示 do in Future
在 do in main
之后:
void main(){
Future((){
print('=======do in Future=========');
});
print('=======do in main=========');
}
---->[打印]----
=======do in main=========
=======do in Future=========
其实 Future
还有个 sync
构造,用于同步处理回调。也就是先执行 sync
这的回调,才能继续执行接下来的方法。如下所示, do in main
在 do in Future
之后:
void main(){
Future.sync((){
print('=======do in Future=========');
});
print('=======do in main=========');
}
---->[打印]----
=======do in Future=========
=======do in main=========
从源码在也很容易理解,其中没有使用 Timer
,而是立即触发回调,所以 sync
中传入的函数仍是在 main
函数栈中触发的。
可能有人心里会纳闷,并且发出亘古不变的灵魂拷问:
那它的存在有什么意义呢?
从 sync
构造中可以看出它会对入参回调进行异常抓取,抓到异常后,310 行
会创建一个异步的 Future
返回。如下所示,通过 sync
执行 parser
方法,其中会抛出异常,这是通过 catchError
可以进行捕捉。从日志中可以看出,异常的捕捉信息在 do in main
之后。
从这里可以看出 sync
方法应用的场景是:希望某个方法的异常处理逻辑在稍后执行,优先处理接下来的逻辑。
void main() {
Future.sync(parser).catchError((e){
print(e);
});
print('=======do in main=========');
}
void parser() {
int a = int.parse('a');
print('=======do in Future=========');
}
2. 理解同步 Stream 对象
Stream
也是同理,默认 StreamController
是异步添加元素,从上面的分源码析中,也知道本质上是通过 scheduleMicrotask
实现的微任务。这样就能保证 add
方法添加元素之后,不会立刻触发 stream#listen
的 onData
回调,而是在 main
函数执行完后,在微任务循环中触发。从而保证异步性:
void main(){
StreamController<int> controller = StreamController();
controller.stream.listen((int event) {
print('=======do in stream===$event======');
});
controller.add(1);
print('=======do in main=========');
controller.add(2);
}
当 StreamController
的 sync
为 true
,就表示希望以同步的方式处理流的通知。比如 25 行
添加元素之后,会立即触发 listen
方法中的回调。这就同步 Stream
事件的同步触发:
3. 理解同步流控制器的实现
_SyncStreamController
也是继承自 _StreamController
,所以 事件订阅
的流程是一致的,只不过它混入的是 _SyncStreamControllerDispatch
,以同步的方式对元素进行分发。
通过源码可以看得很清楚, _SyncStreamControllerDispatch
会直接调用 _subscription._add
分发,触发 _subscription
的 _onData
回调。所以就没有 scheduleMicrotask
什么事了,直接通知 stream#listen
的监听回调。
_SyncStreamController
的应用场景是:希望在添加元素后,Stream
的监听可以立即响应。add
之后的逻辑需要等待 Stream
监听的回调处理完毕。从表现上来看就是上面测试中 do in stream 1
在 do in main
之前触发。
四、广播流控制器
StreamController
默认构造生成的控制器,不能对 stream
进行多次监听,否则会发生异常。
如果需要在多个场所监听一个 StreamController
的流,需要使用广播流控制器:
1. 思考:多次监听异常的原因
测试代码如下,tag2
处对 _AsyncBroadcastStreamController#stream
进行第二次监听将发生异常:
void main(){
StreamController<int> controller = StreamController();
controller.stream.listen((int event) {
print('=======do in stream 1===$event======');
});
controller.stream.listen((int event) { // tag2
print('=======do in stream 2===$event======');
});
controller.add(1);
print('=======do in main=========');
controller.add(2);
}
从源码来看,这是由于 _StreamController#_subscribe
时,状态非 _isInitialState
。可能有人会疑惑,为什么不能支持多次监听呢?
了解 _StreamController
的源码实现,这个问题其实很好解答。因为 _StreamController
只维护了一个 _varData
用于记录 _subscription
,它本身没有能力通知多个订阅者触发回调。
2. 实现控制器流的多监听
想要支持多次监听 流控制器
,可以通过 StreamController.broadcast
构造,创建广播流。 如下所示,这样就每次 add
元素时,就会通知所有的监听者。
void main(){
StreamController<int> controller = StreamController.broadcast();
controller.stream.listen((int event) {
print('=======do in stream#1===$event======');
});
controller.stream.listen((int event) {
print('=======do in stream#2===$event======');
});
controller.add(1);
print('=======do in main=========');
controller.add(2);
}
从构造方法在可以看出,broadcast
也是一个工厂构造,其中也可以通过 sync
入参控制是同步还是异步。默认返回的是异步的广播流控制器 _AsyncBroadcastStreamController
:
3. 为什么广播流可以多次监听
广播流
的使用非常简单,但为什么广播流可以多次监听呢,难道它叫广播,就能多次监听吗? 前面说 _StreamController
中只能记录一个 _subscription
订阅对象,想要通知多个订阅者,那必然是维护了 多个
订阅对象,在添加事件时,依次分发。
从源码中可以看出,_BroadcastStreamController
在持有首尾两个 _BroadcastSubscription
节点:
而 _BroadcastSubscription
类是一个双向链表结构,会记录 _next
和 _previous
节点:
在 _BroadcastStreamController#_subscription
中会通过 _addListener
分发将生成的 subscription
对象加入到订阅者链表中:
---->[_AsyncBroadcastStreamController#_addListener]----
void _addListener(_BroadcastSubscription<T> subscription) {
subscription._eventState = (_state & _STATE_EVENT_ID);
_BroadcastSubscription<T>? oldLast = _lastSubscription;
_lastSubscription = subscription;
subscription._next = null;
subscription._previous = oldLast;
if (oldLast == null) {
_firstSubscription = subscription;
} else {
oldLast._next = subscription;
}
}
在 _sendData
时,会遍历 subscription
链表,依次通过 _addPending
将事件元素加入到微任务队列中。当微任务循环启动时,会依次通知监听者。这就是 _BroadcastStreamController
支持多个订阅者的原理。
结语
上面是 Stream
机制的核心,主要把握住 监听与发送通知
的流程。另外 Stream
中还有一些小知识点,理解起来就很容易。比如 Stream.periodic
可以发送定时的循环任务,本质上是由 _SyncStreamController
和 Timer.periodic
实现的,由定时器驱动事件,控制器添加元素。
Stream.fromFutures
内部通过 _ASyncStreamController
进行控制,遍历 futures
列表,每当任务结束时,触发 onValue
事件,让流控制器添加元素,进行通知。
还有很多小的构造这里就不一一去看了,都大同小异,理解 Stream
机制的流程是最重要的。另外关于 Stream
还有一个比较重要的知识点: Stream
的转换。这个严格意义上来说,并不算是异步的知识,本文也比较长,暂时就不介绍,以后有机会单独写一篇介绍。
到这里 Future
和 Stream
两个关于异步处理最重要的两个对象,我们已经通过源码有了比较深入的了解。下一篇我们将认识一下如何使用 isolate
处理计算密集型的耗时任务,敬请期待 ~
转载自:https://juejin.cn/post/7160461343781306375