深入理解 Flutter 中的 Stream (二)
- 使用
scheduleMicrotask
暂停异步流时,流会在打印第一次输出之后才会暂停 - StreamBuilder 会出现 Stream 数据丢失的情况
那么今天就来拨云见日,深入剖析 Stream,通过源码层面的实现去回答上面的问题。
StreamController 家族一览
要剖析 Stream,先对它的源头 StreamController 入手,下面这张图是它的主要子类关系图。
EventSink
是事件输出的接口,我们经常使用的add
就是来源于此。
_EventDispatch
是事件输出的接口。
_StreamControllerBase
是结合了StreamController
EventSink
_EventDispatch
等接口的组合类,其中_EventDispatch
定义了事件如何输出,是区分各种Stream
功能的关键一环,后面会重点展开。
// 输入事件的接口
abstract interface class EventSink<T> implements Sink<T> {
void add(T event);
void addError(Object error, [StackTrace? stackTrace]);
void close();
}
// 事件分发接口类
abstract class _EventDispatch<T> {
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
// _StreamControllerBase 是结合了几个接口的抽象接口,
// 这里主要关心 _EventDispatch
abstract class _StreamControllerBase<T>
implements
StreamController<T>,
_StreamControllerLifecycle<T>,
_EventSink<T>,
_EventDispatch<T> {}
_StreamControllerBase
底下主要分成两个分支,一个分支是_StreamController
领导的非广播流
实现;另一个是_BroadcastStreamController
所属的广播流
分支。
非广播流
下又可分为同步流
和异步流
,对应的实现分别是_SyncStreamController
、_AsyncSteramCOntroller
。两者的实现区别体现在_SyncStreamControllerDispatch
和_AsyncStreamControllerDispatch
上,而这两个Dispatch
就是实现了_EventDispatch
接口的Mixin
,也就是规定了这条流要怎么输出事件。
废话不多说,直接看源码,从非广播
的基类_StreamController
入手。
_StreamController 的实现
二进制位状态
_StreamController
里定义了很多标志位,它们是基于二进制位运算来更新当前状态的。二进制的每一位都能用来代表一种状态,如下,以_STATE_SUBSCRIBED
来讲,它的二进制位展开就是00000001
,从右往左算第1位就能用来标志流否被订阅
,这里1
代表当前流被订阅,0
则相反。
static const int _STATE_INITIAL = 0;
static const int _STATE_SUBSCRIBED = 1; // 流被订阅
static const int _STATE_CANCELED = 2; // 订阅取消了
static const int _STATE_SUBSCRIPTION_MASK = 3; // 流被订阅/订阅取消
static const int _STATE_CLOSED = 4; // 流关闭了。之后不再接收事件的输入
static const int _STATE_ADDSTREAM = 8; // 流的输入源是 Stream
// 状态标志
int _state = _STATE_INITIAL;
// 是否监听
bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
初始状态是_STATE_INITIAL
,当我需要标记时,使用二进制或
运算:_state |= _STATE_SUBSCRIBED
,展开就是 00000000 | 00000001 = 00000001
,最后一位结果是1
;而当我需要取消标记时,使用二进制与
和非
运算:_state &= ~_STATE_SUBSCRIBED
,展开就是00000001 & 00000000
,最后一位结果是0
。
如果要判断当前位的状态,就去判断最后一位是否是 1
,使用二进制与运算:_state & _STATE_SUBSCRIBED
。
二进制位状态遍布在Stream
的各处实现中,其实这种处理在其它语言中也经常会看到,巧妙得利用了二进制的原理:一个位代表一种状态。
事件输入: EventSink 的实现
前面提到了EventSink
作为流的输入,定义了三种输入接口add
addEerror
close
。add
和addError
方法是类似的处理,所以这里只分析add
和close
方法。
add
// 不在 _STATE_CLOSED && _STATE_ADDSTREAM 状态时,能通过 EventSink 添加事件
bool get _mayAddEvent => (_state < _STATE_CLOSED);
// 流是否被订阅
bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
// 初始状态
bool get _isInitialState =>
(_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
void add(T value) {
if (!_mayAddEvent) throw _badEventState();
_add(value);
}
void _add(T value) {
if (hasListener) {
_sendData(value); // _sendData 是 _EventDispatch 中定义的接口方法
} else if (_isInitialState) {
_ensurePendingEvents().add(_DelayedData<T>(value));
}
}
当_mayAddEvent
为 true 时,add 方法才会往下执行,也就是说当_state
为_STATE_CLOSED
或者_STATE_ADDSTREAM
时,都会抛出错误。搜索这个_STATE_ADDSTREAM
的引用你会发现,只有在addStream
方法中,_state
才会被打上_STATE_ADDSTREAM
的标记:_state |= _STATE_ADDSTREAM
,所以STATE_ADDSTREAM
代表的流的另外一种输入方式。
_isInitialState
代表的是流的初始状态,此时还没有被订阅,当事件输入时,会先缓存到_ensurePendingEvent()
生成的_PendingEvents<T>()
对象中。
总结一下
- Stream 有两种输入源:
EventSink
中定义的接口和addStream
,两种输入源是互斥的。 - 当 Stream 没有被订阅时,事件的输入会缓存到一个列表里
close
// 返回的是一个 Future
Future<void> _ensureDoneFuture() =>
_doneFuture ??= _isCanceled ? Future._nullFuture : _Future<void>();
Future close() {
if (isClosed) {
return _ensureDoneFuture();
}
if (!_mayAddEvent) throw _badEventState();
_closeUnchecked();
return _ensureDoneFuture();
}
// 会发出一个 Done 事件
void _closeUnchecked() {
_state |= _STATE_CLOSED;
if (hasListener) {
_sendDone();
} else if (_isInitialState) {
_ensurePendingEvents().add(const _DelayedDone());
}
}
close
方法返回的是一个 Future,这个 Future 会在 _recordCancel
方法中执行 Complete,当StreamSubscription
结束时,_recordCancel
方法会被调用。StreamSubscription
结束有几种情况,一种是接收到了Done
事件,就是这个close
方法中输入的;一种是StreamSubscription
构造方法中cancelOnError
为 true 时,接收到了error
事件,也就是通过addError
方法输入的;最后一种是StreamSubscription
直接调用自身的cancel
方法。
close
后也会产生一个事件,事件名字是_DelayedDone
。close
后 Stream 不是马上关闭的,要等到事件处理完成后才会完成。
事件输出: _EventDispatch 的实现
前面提到了事件如何输入,现在讲下事件的输出。_EventDispatch
中也有三个方法_sendData
_senError
_sendDone
,这三个输出方法分别对应前面提到的三个输入。_StreamController
并没有实现这三个方法,而是交给了它的子类。同步流和异步流的差异在于继承了不同的_EventDispatch
。
class _SyncStreamController<T> = _StreamController<T>
with _SyncStreamControllerDispatch<T>;
class _AsyncStreamController<T> = _StreamController<T>
with _AsyncStreamControllerDispatch<T>;
接下来看一下不同的_EventDispatch
的实现
mixin _SyncStreamControllerDispatch<T>
implements _StreamController<T>, SynchronousStreamController<T> {
void _sendData(T data) {
_subscription._add(data);
}
void _sendError(Object error, StackTrace stackTrace) {
_subscription._addError(error, stackTrace);
}
void _sendDone() {
_subscription._close();
}
}
mixin _AsyncStreamControllerDispatch<T> implements _StreamController<T> {
void _sendData(T data) {
_subscription._addPending(_DelayedData<T>(data));
}
void _sendError(Object error, StackTrace stackTrace) {
_subscription._addPending(_DelayedError(error, stackTrace));
}
void _sendDone() {
_subscription._addPending(const _DelayedDone());
}
}
可以看到,异步流会把所有的事件统一通过_addPending
去处理,从名字上去看这应该是一个缓存队列。顺着这条路,我们来分析一下_subscription
的来源以及它是如何消费事件的:_add
_addError
_close
_addPending
的区别。
_subscription 的创建
我们知道,StreamSubscription
是通过Stream.listen
返回的,定位一下调用过程,它的完整的路径是 Stream.listen
-> Stream._createSubscription
-> StreamController._subscribe
。最终的_subscribe
的实现是
StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError) {
if (!_isInitialState) {
throw StateError("Stream has already been listened to.");
}
// 创建 subscription
_ControllerSubscription<T> subscription = _ControllerSubscription<T>(
this, onData, onError, onDone, cancelOnError);
_PendingEvents<T>? pendingEvents = _pendingEvents;
_state |= _STATE_SUBSCRIBED;
// 通过 _varData 将 subscription 存起来
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = _varData as dynamic;
addState.varData = subscription;
addState.resume();
} else {
_varData = subscription;
}
subscription._setPendingEvents(pendingEvents); // 将缓存事件传给 Subscription
subscription._guardCallback(() { // 触发 onListen
_runGuarded(onListen);
});
return subscription;
}
这里主要做几件事
- 创建 subscription,并将它的引用保存到 _varData 。 这里有一点需要知道的是 _varData 有可能是
_StreamControllerAddStreamState
或者StreamSubscription
的类型,当_isAddingStream
时,subscription
会被保存到_StreamControllerAddStreamState.varData
里。 - 将提前到的事件 pendingEvent 传给 subscription 。
- 触发 onListen 。
通过前面的流程,获取subscription
的方式就比较容易理解了。
_ControllerSubscription<T> get _subscription {
assert(hasListener);
Object? varData = _varData;
if (_isAddingStream) {
_StreamControllerAddStreamState<Object?> streamState = varData as dynamic;
varData = streamState.varData;
}
return varData as dynamic;
}
_ControllerSubscription 如何消费事件
前面的分析,我们能得出subscription
是_ControllerSubscription
类型的。现在我们就可以通过它的_add
_addError
_close
_addPending
来分析它的输出,它的实现在父类_BufferingStreamSubscription
中。前面三个方法的实现类似,我们只取_add
来分析
// _canFire 表示是不是有事件正在处理中
bool get _canFire => _state < _STATE_IN_CALLBACK;
void _add(T data) {
assert(!_isClosed);
if (_isCanceled) return;
if (_canFire) {
// 如果没有事件在处理,则进行
_sendData(data);
} else {
_addPending(new _DelayedData<T>(data));
}
}
// 发送事件
void _sendData(T data) {
...
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
// 事件处理,调用 _onData
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
// 存放到队列中
void _addPending(_DelayedEvent event) {
var pending = _pending ??= _PendingEvents<T>();
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
if (!_isPaused) {
pending.schedule(this);
}
}
}
这里有两点值得关注
- 事件最终会通过 _onData 发送出去,这个 _onData 是我们创建
Subscription
时传进来的回调方法 Subscription
里面也会有一个队列_pendingEvent
,存放着还未消费的事件,这个队列最终会通过schedule(this)
方法一个个输出。
schedule
关于schedule
方法,我们来看一下它的处理。首先,它会对当前队列状态做一下判断,接着它会通过scheduleMicrotask
开启事件的处理。但要知道,这个scheduleMicrotask
开启的是一个异步事件 micro event,handleNext
就是事件处理的核心,handleNext
会使到队列的下一个事件,回调到Subscription
的_sendData
等方法。
void schedule(_EventDispatch<T> dispatch) {
// 状态判断,避免重复
if (isScheduled) return;
assert(!isEmpty);
if (_eventScheduled) {
assert(_state == stateCanceled);
_state = stateScheduled;
return;
}
// 开启一个 microTask 事件,准备处理事件
scheduleMicrotask(() {
int oldState = _state;
_state = stateUnscheduled;
if (oldState == stateCanceled) return;
handleNext(dispatch);
});
_state = stateScheduled;
}
至此,我已经讲完了非广播
流下的同步
和异步
流的处理,它们从事件输入到输出的整个过程。现在我们可以回过头来看一下,在文章开头提到的第 1 个问题
使用 scheduleMicrotask
暂停异步流时,流为什么会在打印第一次输出之后才会暂停?
void main() async {
// 异步:sync 为 false
var controller = StreamController<int>(sync: false);
var subscription = controller.stream.listen((event) {
print('output: $event');
});
controller.add(1);
controller.add(2);
controller.add(3);
scheduleMicrotask(() {
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 恢复');
subscription.resume();
});
});
}
// will print
// output: 1
// 暂停
// 3秒后 -> 恢复
// output: 2
// output: 3
首先,异步流
会在_AsyncStreamControllerDispatch
通过_subscription._addPending
输入,先放到队列里,最终会通过_pendingEvent.schedule
方法输出。在这里,add(1)
过来之后,schedule
方法就会开始执行了,但由于它里面的scheduleMicrotask
是异步的,所以handleNext
还未执行。接着,同步的add(2)
add(3)
方法过来了。
然后走到下面的scheduleMicrotask
准备暂停流,这个也是异步的任务。需要注意的是这个异步任务排在了add(1)
触发的scheule -> scheduleMicrotask
的后面。所以,handleNext
先执行,输出output: 1
,handleNext
处理完当前事件之后,会走队列下一个事件的输出,这个事件的输出也是通过schedule
方法进行的,也就是后面的事件也是通过schedule
里的scheduleMicrotask
的异步处理里面。
但由于先后顺序问题,先走到了subscriptionn.paused()
里,流先被暂停了,后面的事件就被暂停了。等到 3 秒后,事件恢复,才继续事件的消费。
_BroadcastStreamController 的实现
广播流
的处理跟非广播流
异曲同工。非广播流
只存在一个_subscription
,_BroadcastStreamController
则保存一个链表结构的_BroadcastSubscription
,它里面保存着两个对象_firstSubscription
_lastSubscription
。
// Double-linked list of active listeners.
_BroadcastSubscription<T>? _firstSubscription;
_BroadcastSubscription<T>? _lastSubscription;
对于同步流_SyncBroadcastStreamController
来说,事件输出时通过_forEachListener
遍历操作 _sendData
,_forEachListener
里面有一个_isFiring
的判断,不允许还没有输出结束前再触发输出。
if (_isFiring) {
throw new StateError(
"Cannot fire new event. Controller is already firing an event");
}
为了解决这种情况,在_SyncBroadcastStreamController
下创建了创建了一个子类_AsBroadcastStreamController
,是一种特殊的同步流
,当处于_isFiring
状态时,事件过来的时候,会将事件通过异步的方式处理,也就是保存到pendingEvent
里面。
void add(T data) {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedData<T>(data));
return;
}
super.add(data);
_flushPending();
}
异步流_AsyncBroadcastStreamController
则是跟非广播
下的异步流
的处理方式一样,事件都会先缓存到_pendingEvent
里面。
总的来说,非广播流
和广播流
下的同步
和同步
的处理是类似的。 只要掌握同步
和异步
的本质就行。
总结
本来还打算继续分析 Stream 的变换处理 _ForwardingStream
以及 StreamBuilder
的实现,但写着写着文章已到了 12000+ 的字数。这两者的实现里面没有太难理解的部分,既然如此,就留给大家自己去分析和思考了,有问题可以随时留言讨论,或者点个关注+发私信,我看到一定会回复的~
转载自:https://juejin.cn/post/7268539503903211560