深入理解 Flutter 中的 Stream (二)
- 使用
scheduleMicrotask暂停异步流时,流会在打印第一次输出之后才会暂停 - StreamBuilder 会出现 Stream 数据丢失的情况
那么今天就来拨云见日,深入剖析 Stream,通过源码层面的实现去回答上面的问题。
StreamController 家族一览
要剖析 Stream,先对它的源头 StreamController 入手,下面这张图是它的主要子类关系图。

EventSink是事件输出的接口,我们经常使用的add就是来源于此。
_EventDispatch是事件输出的接口。
_StreamControllerBase是结合了StreamController EventSink _EventDispatch等接口的组合类,其中_EventDispatch定义了事件如何输出,是区分各种Stream功能的关键一环,后面会重点展开。
// 输入事件的接口
abstract interface class EventSink<T> implements Sink<T> {
void add(T event);
void addError(Object error, [StackTrace? stackTrace]);
void close();
}
// 事件分发接口类
abstract class _EventDispatch<T> {
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
// _StreamControllerBase 是结合了几个接口的抽象接口,
// 这里主要关心 _EventDispatch
abstract class _StreamControllerBase<T>
implements
StreamController<T>,
_StreamControllerLifecycle<T>,
_EventSink<T>,
_EventDispatch<T> {}
_StreamControllerBase底下主要分成两个分支,一个分支是_StreamController领导的非广播流实现;另一个是_BroadcastStreamController所属的广播流分支。
非广播流下又可分为同步流和异步流,对应的实现分别是_SyncStreamController、_AsyncSteramCOntroller。两者的实现区别体现在_SyncStreamControllerDispatch和_AsyncStreamControllerDispatch上,而这两个Dispatch就是实现了_EventDispatch接口的Mixin,也就是规定了这条流要怎么输出事件。
废话不多说,直接看源码,从非广播的基类_StreamController入手。
_StreamController 的实现
二进制位状态
_StreamController 里定义了很多标志位,它们是基于二进制位运算来更新当前状态的。二进制的每一位都能用来代表一种状态,如下,以_STATE_SUBSCRIBED来讲,它的二进制位展开就是00000001,从右往左算第1位就能用来标志流否被订阅,这里1代表当前流被订阅,0则相反。
static const int _STATE_INITIAL = 0;
static const int _STATE_SUBSCRIBED = 1; // 流被订阅
static const int _STATE_CANCELED = 2; // 订阅取消了
static const int _STATE_SUBSCRIPTION_MASK = 3; // 流被订阅/订阅取消
static const int _STATE_CLOSED = 4; // 流关闭了。之后不再接收事件的输入
static const int _STATE_ADDSTREAM = 8; // 流的输入源是 Stream
// 状态标志
int _state = _STATE_INITIAL;
// 是否监听
bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
初始状态是_STATE_INITIAL,当我需要标记时,使用二进制或运算:_state |= _STATE_SUBSCRIBED,展开就是 00000000 | 00000001 = 00000001,最后一位结果是1;而当我需要取消标记时,使用二进制与和非运算:_state &= ~_STATE_SUBSCRIBED,展开就是00000001 & 00000000,最后一位结果是0。
如果要判断当前位的状态,就去判断最后一位是否是 1,使用二进制与运算:_state & _STATE_SUBSCRIBED。
二进制位状态遍布在Stream的各处实现中,其实这种处理在其它语言中也经常会看到,巧妙得利用了二进制的原理:一个位代表一种状态。
事件输入: EventSink 的实现
前面提到了EventSink作为流的输入,定义了三种输入接口add addEerror close。add和addError方法是类似的处理,所以这里只分析add和close方法。
add
// 不在 _STATE_CLOSED && _STATE_ADDSTREAM 状态时,能通过 EventSink 添加事件
bool get _mayAddEvent => (_state < _STATE_CLOSED);
// 流是否被订阅
bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
// 初始状态
bool get _isInitialState =>
(_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
void add(T value) {
if (!_mayAddEvent) throw _badEventState();
_add(value);
}
void _add(T value) {
if (hasListener) {
_sendData(value); // _sendData 是 _EventDispatch 中定义的接口方法
} else if (_isInitialState) {
_ensurePendingEvents().add(_DelayedData<T>(value));
}
}
当_mayAddEvent为 true 时,add 方法才会往下执行,也就是说当_state为_STATE_CLOSED或者_STATE_ADDSTREAM时,都会抛出错误。搜索这个_STATE_ADDSTREAM的引用你会发现,只有在addStream方法中,_state才会被打上_STATE_ADDSTREAM的标记:_state |= _STATE_ADDSTREAM,所以STATE_ADDSTREAM代表的流的另外一种输入方式。
_isInitialState代表的是流的初始状态,此时还没有被订阅,当事件输入时,会先缓存到_ensurePendingEvent()生成的_PendingEvents<T>()对象中。
总结一下
- Stream 有两种输入源:
EventSink中定义的接口和addStream,两种输入源是互斥的。 - 当 Stream 没有被订阅时,事件的输入会缓存到一个列表里
close
// 返回的是一个 Future
Future<void> _ensureDoneFuture() =>
_doneFuture ??= _isCanceled ? Future._nullFuture : _Future<void>();
Future close() {
if (isClosed) {
return _ensureDoneFuture();
}
if (!_mayAddEvent) throw _badEventState();
_closeUnchecked();
return _ensureDoneFuture();
}
// 会发出一个 Done 事件
void _closeUnchecked() {
_state |= _STATE_CLOSED;
if (hasListener) {
_sendDone();
} else if (_isInitialState) {
_ensurePendingEvents().add(const _DelayedDone());
}
}
close 方法返回的是一个 Future,这个 Future 会在 _recordCancel 方法中执行 Complete,当StreamSubscription结束时,_recordCancel方法会被调用。StreamSubscription结束有几种情况,一种是接收到了Done事件,就是这个close方法中输入的;一种是StreamSubscription构造方法中cancelOnError为 true 时,接收到了error事件,也就是通过addError方法输入的;最后一种是StreamSubscription直接调用自身的cancel方法。
close后也会产生一个事件,事件名字是_DelayedDone。close后 Stream 不是马上关闭的,要等到事件处理完成后才会完成。
事件输出: _EventDispatch 的实现
前面提到了事件如何输入,现在讲下事件的输出。_EventDispatch中也有三个方法_sendData _senError _sendDone,这三个输出方法分别对应前面提到的三个输入。_StreamController并没有实现这三个方法,而是交给了它的子类。同步流和异步流的差异在于继承了不同的_EventDispatch。
class _SyncStreamController<T> = _StreamController<T>
with _SyncStreamControllerDispatch<T>;
class _AsyncStreamController<T> = _StreamController<T>
with _AsyncStreamControllerDispatch<T>;
接下来看一下不同的_EventDispatch的实现
mixin _SyncStreamControllerDispatch<T>
implements _StreamController<T>, SynchronousStreamController<T> {
void _sendData(T data) {
_subscription._add(data);
}
void _sendError(Object error, StackTrace stackTrace) {
_subscription._addError(error, stackTrace);
}
void _sendDone() {
_subscription._close();
}
}
mixin _AsyncStreamControllerDispatch<T> implements _StreamController<T> {
void _sendData(T data) {
_subscription._addPending(_DelayedData<T>(data));
}
void _sendError(Object error, StackTrace stackTrace) {
_subscription._addPending(_DelayedError(error, stackTrace));
}
void _sendDone() {
_subscription._addPending(const _DelayedDone());
}
}
可以看到,异步流会把所有的事件统一通过_addPending去处理,从名字上去看这应该是一个缓存队列。顺着这条路,我们来分析一下_subscription的来源以及它是如何消费事件的:_add _addError _close _addPending的区别。
_subscription 的创建
我们知道,StreamSubscription是通过Stream.listen返回的,定位一下调用过程,它的完整的路径是 Stream.listen -> Stream._createSubscription -> StreamController._subscribe。最终的_subscribe的实现是
StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError) {
if (!_isInitialState) {
throw StateError("Stream has already been listened to.");
}
// 创建 subscription
_ControllerSubscription<T> subscription = _ControllerSubscription<T>(
this, onData, onError, onDone, cancelOnError);
_PendingEvents<T>? pendingEvents = _pendingEvents;
_state |= _STATE_SUBSCRIBED;
// 通过 _varData 将 subscription 存起来
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = _varData as dynamic;
addState.varData = subscription;
addState.resume();
} else {
_varData = subscription;
}
subscription._setPendingEvents(pendingEvents); // 将缓存事件传给 Subscription
subscription._guardCallback(() { // 触发 onListen
_runGuarded(onListen);
});
return subscription;
}
这里主要做几件事
- 创建 subscription,并将它的引用保存到 _varData 。 这里有一点需要知道的是 _varData 有可能是
_StreamControllerAddStreamState或者StreamSubscription的类型,当_isAddingStream时,subscription会被保存到_StreamControllerAddStreamState.varData里。 - 将提前到的事件 pendingEvent 传给 subscription 。
- 触发 onListen 。
通过前面的流程,获取subscription的方式就比较容易理解了。
_ControllerSubscription<T> get _subscription {
assert(hasListener);
Object? varData = _varData;
if (_isAddingStream) {
_StreamControllerAddStreamState<Object?> streamState = varData as dynamic;
varData = streamState.varData;
}
return varData as dynamic;
}
_ControllerSubscription 如何消费事件
前面的分析,我们能得出subscription是_ControllerSubscription类型的。现在我们就可以通过它的_add _addError _close _addPending 来分析它的输出,它的实现在父类_BufferingStreamSubscription 中。前面三个方法的实现类似,我们只取_add来分析
// _canFire 表示是不是有事件正在处理中
bool get _canFire => _state < _STATE_IN_CALLBACK;
void _add(T data) {
assert(!_isClosed);
if (_isCanceled) return;
if (_canFire) {
// 如果没有事件在处理,则进行
_sendData(data);
} else {
_addPending(new _DelayedData<T>(data));
}
}
// 发送事件
void _sendData(T data) {
...
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
// 事件处理,调用 _onData
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
// 存放到队列中
void _addPending(_DelayedEvent event) {
var pending = _pending ??= _PendingEvents<T>();
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
if (!_isPaused) {
pending.schedule(this);
}
}
}
这里有两点值得关注
- 事件最终会通过 _onData 发送出去,这个 _onData 是我们创建
Subscription时传进来的回调方法 Subscription里面也会有一个队列_pendingEvent,存放着还未消费的事件,这个队列最终会通过schedule(this)方法一个个输出。
schedule
关于schedule方法,我们来看一下它的处理。首先,它会对当前队列状态做一下判断,接着它会通过scheduleMicrotask开启事件的处理。但要知道,这个scheduleMicrotask开启的是一个异步事件 micro event,handleNext就是事件处理的核心,handleNext会使到队列的下一个事件,回调到Subscription的_sendData等方法。
void schedule(_EventDispatch<T> dispatch) {
// 状态判断,避免重复
if (isScheduled) return;
assert(!isEmpty);
if (_eventScheduled) {
assert(_state == stateCanceled);
_state = stateScheduled;
return;
}
// 开启一个 microTask 事件,准备处理事件
scheduleMicrotask(() {
int oldState = _state;
_state = stateUnscheduled;
if (oldState == stateCanceled) return;
handleNext(dispatch);
});
_state = stateScheduled;
}
至此,我已经讲完了非广播流下的同步和异步流的处理,它们从事件输入到输出的整个过程。现在我们可以回过头来看一下,在文章开头提到的第 1 个问题
使用 scheduleMicrotask 暂停异步流时,流为什么会在打印第一次输出之后才会暂停?
void main() async {
// 异步:sync 为 false
var controller = StreamController<int>(sync: false);
var subscription = controller.stream.listen((event) {
print('output: $event');
});
controller.add(1);
controller.add(2);
controller.add(3);
scheduleMicrotask(() {
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 恢复');
subscription.resume();
});
});
}
// will print
// output: 1
// 暂停
// 3秒后 -> 恢复
// output: 2
// output: 3
首先,异步流会在_AsyncStreamControllerDispatch通过_subscription._addPending输入,先放到队列里,最终会通过_pendingEvent.schedule方法输出。在这里,add(1)过来之后,schedule方法就会开始执行了,但由于它里面的scheduleMicrotask是异步的,所以handleNext还未执行。接着,同步的add(2) add(3)方法过来了。
然后走到下面的scheduleMicrotask准备暂停流,这个也是异步的任务。需要注意的是这个异步任务排在了add(1)触发的scheule -> scheduleMicrotask的后面。所以,handleNext先执行,输出output: 1,handleNext处理完当前事件之后,会走队列下一个事件的输出,这个事件的输出也是通过schedule方法进行的,也就是后面的事件也是通过schedule里的scheduleMicrotask的异步处理里面。
但由于先后顺序问题,先走到了subscriptionn.paused()里,流先被暂停了,后面的事件就被暂停了。等到 3 秒后,事件恢复,才继续事件的消费。
_BroadcastStreamController 的实现
广播流的处理跟非广播流异曲同工。非广播流只存在一个_subscription,_BroadcastStreamController则保存一个链表结构的_BroadcastSubscription,它里面保存着两个对象_firstSubscription _lastSubscription。
// Double-linked list of active listeners.
_BroadcastSubscription<T>? _firstSubscription;
_BroadcastSubscription<T>? _lastSubscription;
对于同步流_SyncBroadcastStreamController来说,事件输出时通过_forEachListener 遍历操作 _sendData,_forEachListener里面有一个_isFiring的判断,不允许还没有输出结束前再触发输出。
if (_isFiring) {
throw new StateError(
"Cannot fire new event. Controller is already firing an event");
}
为了解决这种情况,在_SyncBroadcastStreamController下创建了创建了一个子类_AsBroadcastStreamController,是一种特殊的同步流,当处于_isFiring状态时,事件过来的时候,会将事件通过异步的方式处理,也就是保存到pendingEvent里面。
void add(T data) {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedData<T>(data));
return;
}
super.add(data);
_flushPending();
}
异步流_AsyncBroadcastStreamController则是跟非广播下的异步流的处理方式一样,事件都会先缓存到_pendingEvent里面。
总的来说,非广播流和广播流下的同步和同步的处理是类似的。 只要掌握同步和异步的本质就行。
总结
本来还打算继续分析 Stream 的变换处理 _ForwardingStream 以及 StreamBuilder的实现,但写着写着文章已到了 12000+ 的字数。这两者的实现里面没有太难理解的部分,既然如此,就留给大家自己去分析和思考了,有问题可以随时留言讨论,或者点个关注+发私信,我看到一定会回复的~
转载自:https://juejin.cn/post/7268539503903211560