深入理解 Flutter 中的 Stream (一)
Stream
是 Flutter 处理数据响应的一个重要手段,它提供了一种处理数据流的方式,其作用类似于 Kotlin 中的 Flow
,基于发布订阅模式
的设计,通过监听Stream
,订阅者能不断接收到数据流的最新变化。
Stream 的基本用法
Stream
能通过async*
和StreamController
产生,也能通过其它Stream
转换而来。相较于async*
,StreamController
因为灵活度更高,因此更为常见,两者在使用场景上也有一定差别。
async*
相信大家一定认识async
,但async*
就未必,同样作为 Flutter 里异步处理的一环,async
主要跟Future
打交道,而async*
处理的对象是Stream
,async*
在使用上需要搭配yield
。下面这段代码演示了如何使用async*
进行 1 到 10 的相加。
Stream<int> countStream(int to) async* {
print("stream 被监听");
for (int i = 1; i <= to; i++) {
yield i;
}
}
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
void main() async {
var stream = countStream(10);
// 当注释掉下面这行,控制台不会打印出 "stream 被监听",也就表示 async* 方法体没被执行
var sum = await sumStream(stream);
print(sum); // 55
}
在上面的示例中,async*
方法体里yield
在每次的遍历中,都往Stream
返回一个数据,通过 await for
的监听拿到每次返回的值,接着执行sum
操作。值得注意的是,async*
这种方式产生的stream
,当stream
没有被监听时,async*
方法体是不会被执行的。
如果你看着async*
还有点别扭,请记住:async
返回的是一个Future
,而async*
返回的是一个Stream
。
StreamController
日常开发中,通常会通过StreamController
创建Stream
。只需要构造出StreamController
对象,通过这个对象的.stream
就可以得到Stream
。
Stream<int> countStream(int to) {
// 先创建 StreamController
late StreamController<int> controller;
controller = StreamController<int>(onListen: () {
// 当 Stream 被监听时会触发 onListen 回调
for (var i = 0; i < to; i++) {
controller.add(i);
}
controller.close();
});
return controller.stream;
}
Future<int> listenOn(Stream<int> stream) async {
var completer = Completer<int>();
var sum = 0;
// 监听 stream
stream.listen(
(event) {
sum += event;
},
onDone: () => completer.complete(sum),
);
return completer.future;
}
void main() async {
var stream = countStream(10);
// 当注释掉下面这行,控制台也不会打印出 "stream 被监听"
var sum = await listenOn(stream);
print(sum); // 55
}
在创建StreamController
的时候传入了一个onListen
回调,当流第一次被监听的时候,会触发这个回调,此时会往流里面依次添加多个数据,listenOn
方法里拿到这些数据执行相加操作。这里使用了stream
的listen
的方法进行监听。
Stream 左右护法
Flutter 中的 Stream 处理,涉及到三类对象,以发布订阅模式
的角度去看的话,可以分为发布者 StreamController
、数据通道 Stream
、订阅者 StreamSubscription
。
class Example {
var controller = StreamController<int>();
Stream<int> get stream => controller.stream;
StreamSubscription<int>? _subscription;
void initState() {
_subscription = stream.listen((event) {
print(event);
});
for(var i = 0; i <= 10; i++) {
controller.add(i);
}
}
void dispose() {
_subscription?.cancel();
_subscription = null;
}
}
每一个StreamController
都对应着一个Stream
,当Stream
被订阅时,会得到一个StreamSubscription
对象。
上面的例子中,接口使用是简单的,但是他们内部的工作原理是如何?一个事件从发布到消费中间经过了哪些流程?
数据流向图
在事件处理上:Stream
在被订阅时,会创建StreamSubscription
,并将其中的onData
等事件处理的回调传给StreamSubscription
。
在事件输入输出上:StreamController
通过add
方法输入事件后,先判断此时是否存在订阅者StreamSubscription
,如果存在则调用StreamSubscription
的onData
处理,不存在就先存到_pendingEvents
里,等到下次StreamSubscription
出现了再向它输出事件。
可以看到,StreamController
在整个事件流向的处理中肩负着最重要的使命,它控制着事件如何输入和输出,StreamSubscription
负责处理输出到这里的事件,Stream
在得到StreamSubscription
后就完成了它的使命选择“退隐山林”。
这么讲可能还有点“干”,为了更直观的介绍他们各自的职责,接下来我们从他们定义的接口出发,去思考他们都能做哪些事件。为了方便呈现,我只取其中最关键的部分。
StreamController
abstract interface class StreamController<T> implements StreamSink<T> {
// stream 流
Stream<T> get stream;
// 流状态的回调
abstract void Function()? onListen; // 被监听
abstract void Function()? onPause; // 流暂停
abstract void Function()? onResume; // 流恢复
abstract void Function()? onCancel; // 流取消/关闭
// 流状态
bool get isClosed;
bool get isPaused;
bool get hasLitener; // 当前流是否有订阅者
// 监听 source,转发给 stream
Future addStream(Stream<T> source, {bool? cancelOnError});
// 往流里面添加事件
void add(T event);
void addError(Object error, [StackTrace? stackTrace]);
Future close(); // 关闭流
// 输出事件
// 以下这三个接口在 _StreamControllerBase 中
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
StreamController
负责管理事件流的状态,当状态变化时,会触发到相应的回调(onListen/onPause等)。StreamController
负责事件的输入,输入的方式有两种,一种是事件接口add
、addError
;另外一种是通过监听其它的Stream
;同时事件也分为两种,一种是正常事件,一种是错误事件。StreamController
能关闭这个事件流通道,会产生一个onDone
事件。StreamController
负责事件的输出,不同的输入对应不同的输出。
Stream
abstract mixin class Stream<T> {
// 是否地广播流,广播流允许多订阅
bool get isBroadcast => false;
// 监听流变化,返回订阅者
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
// 一系列 Stream 处理和变换操作
Stream<T> where(bool test(T event)) { ... }
Stream<S> map<S>(S convert(T event)) { ... }
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { ...}
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { ... }
Stream<T> handleError(Function onError, {bool test(error)?}) { ... }
...
}
Stream
暴露事件流的订阅方法listen
,返回当前订阅者,并把listen
方法中的onData
等参数注册到当前订阅者里面。Stream
有很多过滤转换等语法糖方法。
StreamSubscription
abstract interface class StreamSubscription<T> {
// 监听数据变化
void onData(void handleData(T data)?);
void onError(Function? handleError);
void onDone(void handleDone()?);
// 暂停/恢复 监听
void pause([Future<void>? resumeSignal]);
void resume();
bool get isPaused;
// 取消监听
Future<void> cancel();
// 转成 Future 对象,监听流结束事件
Future<E> asFuture<E>([E? futureValue]);
}
StreamSubscription
作为事件输出端,负责事件的输出处理。StreamSubscription
也能对自己的订阅行为进行暂停、恢复或取消等动作。
Stream 的分类
Stream
有很多子类,对应不同场景的实现。比如对于输入端而言,可以分为同步流和异步流;在输出端上,又可分为广播流和非广播流。
同步和异步
StreamController
的工厂方法中,通过sync
可以指定同步或者异步。同步和异步的区别是:事件输入后是否会立即输出。 同步流在事件输入后会立刻执行onData
,异步流在事件输入后会注册一个异步事件,等到当前EventLoop
中的同步事件处理后触发onData
。
factory StreamController(
{void onListen()?,
void onPause()?,
void onResume()?,
FutureOr<void> onCancel()?,
bool sync = false}) {
return sync
? _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
在实现上看,_SyncStreamController
最终输出时使用的是_SyncStreamControllerDispatch
,_AsyncStreamController
使用的是_AsyncStreamControllerDispatch
。
两者在输出处理不同,_SyncStreamControllerDispatch
调用的是subscription
的_add
方法,_AsyncStreamControllerDispatch
调用的是subscription
的_addPending
方法。_addPending
会先将事件存到队列里,同时如果队列没有在跑就开启队列的处理,通过scheduleMicrotask
对事件进行异步处理,处理完当前事件继续处理队列时的其它事件,直到队列清空。
广播和非广播
上述代码中生产的是非广播流,广播流通过StreamController.broadcast
方法创建。广播和非广播的区别是是否允许多次订阅。
factory StreamController.broadcast(
{void onListen()?, void onCancel()?, bool sync = false}) {
return sync
? _SyncBroadcastStreamController<T>(onListen, onCancel)
: _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
非广播StreamController
继承自_StreamController
,广播StreamController
继承自_BroadcastStreamController
,两者的区别可以通过_subscribe
的实现体现。_StreamController
的实现如下,当重复订阅后会直接抛出 StateError 异常。
StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError) {
if (!_isInitialState) {
throw StateError("Stream has already been listened to.");
}
_ControllerSubscription<T> subscription = _ControllerSubscription<T>(
this, onData, onError, onDone, cancelOnError);
// ...
return subscription;
}
_BroadcastStreamController
里面有两个对象_firstSubscription
、_lastSubscription
,_BroadcastSubscription
是双向链表结构。当需要输出事件时,通过整个链表,通知所有的订阅进行消息的处理。
_BroadcastSubscription<T>? _firstSubscription;
_BroadcastSubscription<T>? _lastSubscription;
开发实战
通过前面接口和分类的分析,我们对这 Stream 有了更深刻的认识。刀已磨好,接下来便通过两个例子来试试这把刀到底锋不锋利。
利用 Stream 实现事件的广播
事件的广播,在开发时总会遇到,尤其是在跨组件或跨页面的场景,相信大部分开发者的项目里也都会引入类似EventBus
的三方或自研框架。比如:当我在编辑个人资料时,Save
之后需要通知其它页面进行刷新以展示最新的个人信息。
// user entity
class UserInfo {
int uid;
String name;
UserInfo(this.uid, this.name);
}
// userinfo update
class UserInfoChangeEvent {
static final _controller = StreamController<UserInfo>.broadcast();
static StreamSubscription<UserInfo> subscribe(Function(UserInfo) callback) {
return _controller.stream.listen(callback);
}
static void broadcast(UserInfo userInfo) {
_controller.add(userInfo);
}
}
// 用户编辑页面
class UserProfileViewModel {
...
// 点击 save 时,会调用到 broadcast 方法向外发送事件
void onSave(int uid, String name) {
UserInfoChangeEvent.broadcast(UserInfo(uid, name));
}
}
// 其它页面状态刷新
class ViewState extends State<ViewWidget> {
StreamSubscription<UserInfo>? _subscription;
UserInfo? _curUserInfo;
@override
void initState() {
super.initState();
// 初始化时,监听 UserInfoChangeEvent
_subscription = UserInfoChangeEvent.subscribe((userInfo) {
setState(() {
_curUserInfo = userInfo;
})
});
}
@override
void dispose() {
super.dispose();
// 退出时,要取消监听。否则会有内存泄漏
_subscription?.cancel();
_subscription = null;
}
}
这里,UserInfoChangeEvent
定义了广播类型的StreamController
,并且向外暴露了subscribe
和boradcast
接口,用户编辑页面在点击save
时走到onSave
方法,这个方法里调用了UserInfoChangeEvent
的broadcast
方法向外发送了一个更改信息的事件;
接着ViewState
这里在initState
时通过UserInfoChangeEvent
的subscribe
方法注册了监听,接收到了事件赋值到当前_curUserInfo
,调用setState
刷新页面。
StreamBuilder 实现 Widget 自动刷新
Flutter 提供了一个组件StreamBuilder
,能帮助我们方便的监听Stream
并刷新 Widget。例如进入一个页面时,通常会有一个数据加载的过程,此时页面会经历 Loading -> Loaded/LoadError 的状态变更,不同的状态会呈现不同的页面 UI,这时我们就需要定义一个 LoadingState 的枚举类型,在数据加载后时通过 StreamController 发布 LoadState 状态,StreamBuilder
监听到更新然后会自动触发 Widget 的刷新。AsyncSnapShot
是快照的意思,保存着此刻最新的事件。
StreamBuilder<LoadingState>(
stream: viewModel.loadingStateStream,
initialData: LoadingState.loading,
builder: (BuildContext context, AsyncSnapshot snapshot) {
// 根据 snapshot 的数据处理返回
var data = snapshot.data;
if (data == LoadingState.Loaded) {
reutrn Container(child: Text("Loaded Success"));
} else if (data == LoadingStat.LoadError) {
reutrn Container(child: Text("Loading Error"));
} else {
return LoadingView()
}
},
)
不过,StreamBuilder
也有坑。我们知道,对于Stream
来说,事件被消费了就会丢掉,无论是StreamController
还是Stream
都不会保存上次的值,以页面加载为例,页面进来后ViewModel
执行数据加载完成后,向loadingStateStream
里发布了Loaded
的状态,如果此时页面还没有布局StreamBuilder
,StreamBuilder
就无法收到这次监听,等到后面StreamBuilder
真正添加到界面上时已经错过了上次的事件,AsyncSnapshot
拿到的还是initialData
时设置的数据,也就是 loading 态,这样状态就会展示异常。
你可能会有疑问,为什么StreamBuilder
不能一开始就添加到页面的build
方面里?当然可以,但即便如此也无法保证StreamBuilder
的监听一定会比viewModel
的状态更新早,因为如果页面的内容较长,一开始StreamBuilder
还不在可视区内,它的initState
方法就不会执行,也就不会监听loadingStateStream
。
StreamBuilder
会面临这种囧境,归根到底是因为Stream
的设计。
Stream 的变换和处理
前面在介绍Stream
的接口时,我们提到过Stream
里面有很多操作方法。在这part,着重挑几个从名字上不太好理解的展开讲讲。
Future<E> drain<E>([E? futureValue]);
drain
意为“排出、消耗“。这里指”排掉”这条流中间所有的事件,只响应结束信号,当流关闭时返回 futureValue。
final result = await Stream.fromIterable([1, 2, 3]).drain(100);
print(result); // Outputs: 100.
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine)
事件迭代。根据combine
合并流里面的事件,该方法可以指定返回的类型S
,同时可以指定初始值 initialValue
。
final result = await Stream<int>.fromIterable([2, 4, 6, 8, 10])
.fold<String>("0", (previous, element) => "$previous - $element");
print(result); // 0 - 2 - 4 - 6 - 8 - 10
Future<T> reduce(T Function(T previous, T element) combine);
也是事件迭代。与fold
不同的是,reduce
无法指定初始值且它只能返回与原流相同的类型T
。
final result = await Stream.fromIterable<int>([2, 6, 10, 8, 2])
.fold<int>(10, (previous, element) => previous + element);
print(result); // 38
Future pipe(StreamConsumer<T> streamConsumer);
流管道拼接。将当前流的事件流向streamConsumer
中,streamConsumer
的子类实现通常是一个StreamController
,拿到事件后通知给它的订阅者。
var controller = StreamController<int>();
var stream = controller.stream;
stream.listen((event) {
print(event); // 2 4 6 8 10
});
var result = await Stream<int>.fromIterable([2, 4, 6, 8, 10]).pipe(controller);
print("result: $result"); // null
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
异步展开。将原流中的事件做一次展开操作,得到一个E
类型的新流。
var stream = Stream<int>.fromIterable([2, 4, 6, 8, 10]);
var newStream = stream.asyncExpand((event) {
return Stream<int>.fromIterable([event, event + 1]);
});
newStream.listen((event){
print(event); // 2 3 4 5 6 7 8 9 10
});
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
异步映射。跟asyncExpand
类似,只是转换操作返回的是FutureOr
对象,为那些转换过程中涉及到异步处理的场景提供便利。
var newStream = stream.asyncMap((event) async {
await Future.delayed(const Duration(seconds: 1));
return event + 1;
});
newStream.listen((event){
print(event); // 3 5 7 9 11
});
实在忍不住想吐槽一下,有些方法的名字起的真心不昨滴,其中部分都有点“挂羊头卖狗肉”的感觉了。。。
你真的懂了吗?
讲了很多,现在来检验一下。假设有一段逻辑,controller
会增加三个事件,分别是add(1) add(2) add()3
,subscription
会在每次收到事件时打印output: $event
,中间会有一次暂停,3 秒后恢复,猜一下在以下几种场景下最后输出的顺序是什么?
1. 同步流
void main() async {
// 同步流:sync 为 true
var controller = StreamController<int>(sync: true);
var subscription = controller.stream.listen((event) {
print('output: $event');
});
controller.add(1);
controller.add(2);
controller.add(3);
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 恢复');
subscription.resume();
});
}
// will print:
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 恢复
2. 异步流
保持 1 中其它代码不变,将sync
的值设置成false
。
// will print:
// 暂停
// 3秒后 -> 恢复
// output: 1
// output: 2
// output: 3
3. 异步流:使用 Future.delayed 延迟暂停
保持 2 中其它代码不变,将暂停恢复延迟Duraiton.zero
。
Future.delayed(Duration.zero, (){
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 恢复');
subscription.resume();
});
});
// will print
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 恢复
4. 异步流:使用 scheduleMicrotask 延迟暂停
保持 3 中其它代码不变,用scheduleMicrotask
代替Future.delayed
。
scheduleMicrotask(() {
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 恢复');
subscription.resume();
});
});
// will print
// output: 1
// 暂停
// 3秒后 -> 恢复
// output: 2
// output: 3
上面的输出是否如你所料?相信如果你理解了之前的介绍,对1 2 3点的输出结果是没有问题的。但是对于第 4 点:虽然同样为延迟暂停,3 和 4 中的输出完全不一样,4 中的输出在输出output: 1
后才会触发暂停。这又是为什么呢?要解释这个输出,就要从源码出发了。
结语
所以,我们首先要从概念上理解他们,其次我们还要从代码上知道具体的实现。当程度的执行不及预期,缺乏代码实现层面的理解,我们便会显得手忙脚乱。像前面出现的StreamBuilder处理中的坑和输出顺序的问题,只有阅读底层源码,才能发现原因并准确修复。下一篇文章,将从源码实现上深入分析Stream
。
转载自:https://juejin.cn/post/7266844831606079540