Flutter Stream
Stream是什么
Stream 是一系列异步事件的序列。其类似于一个异步的 Iterable,不同的是当你向 Iterable 获取下一个事件时它会立即给你,但是 Stream 则不会立即给你而是在它准备好时告诉你。
单个 | 0个或多个 | |
---|---|---|
Sync | int | Interable<int> |
Async | Future<int> | Stream<int> |
Stream对象
- StreamController:用于整个Stream过程的控制,提供各类接口用于创建各种事件流。
- StreamSink:事件的入口,
add
,addStream
等。 - Stream:事件源本身,一般可用于监听事件或者对事件进行转换,如
listen
、where
。 - StreamSubscription:事件订阅后的对象,表面上用于管理订阅过各类操作,如
cancel
、pause
,同时在内部也是事件的中转关键。
关系
有一个事件源叫Stream,为了方便控制Stream,官方提供了StreamController作为管理;同时它对外提供了
StreamSink
对象作为事件入口;又为Stream
属性提供了监听和变换,最后得到StreamSubscription
可以管理事件的订阅。
Stream工作原理
Stream
在listen
的时候传入onData
回调,这个回调会传入到StreamSubscription
中,之后通过zone.registerUnaryCallback
注册得到_onData
对象StreamSink
在添加事件时,会执行到StreamSubscription
中的_sendData
方法,然后通过_zone.runUnaryGuarded(_onData, data)
执行1中得到的_onData
对象,触发listen
时传入的回调方法
//1.listen传入onData回调到StreamSubscription中
StreamSubscription<T> listen(void onData(T data)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
cancelOnError ??= false;
StreamSubscription<T> subscription =
_createSubscription(onData, onError, onDone, cancelOnError);
_onListen(subscription);
return subscription;
}
//为节省篇幅,已省略部分代码
//在此,已经获取到_onData函数对象
_onData = _registerDataHandler<T>(_zone, onData),
//把onData传入进行注册
static void Function(T) _registerDataHandler<T>(
Zone zone, void Function(T)? handleData) {
return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler);
}
复制代码
//2.sink添加事件StreamSubscription._sendData,然后调用_zone.runUnaryGuarded(_onData, data),
/* _EventDispatch interface. */
void _sendData(T data) {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
值得注意的是,Stream是在microtask中被调度的
Stream支持同步、异步(默认异步),广播与非广播
Stream支持以下方法:
map()
:将此stream的每个元素转换为一个新的stream事件。where()
:通过当前stream创建一个新的stream并根据条件丢弃某些元素distinct()
:如果数据事件与前一个数据事件相等,则跳过数据事件。- ....
Stream的使用
通过Stream构造器创建
- Stream.fromFuture:通过
Future
创建一个单一订阅stream
- Stream.fromIterable:通过
Iterable
的数据创建一个单一订阅的stream
- Stream.fromFutures:通过一组
Future
创建单一个单一订阅流的stream
- Stream.periodic:通过时段创建一个重复发出事件的
stream
通过StreamController创建
import 'dart:async';
void main() {
//1.创建一个任意类型StreamController对象
StreamController streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.通过sink槽口添加任意类型事件数据
streamController.sink.add(100);
streamController.sink.add(100.121212);
streamController.sink.add('THIS IS STRING');
streamController.sink.close();//只有手动调用close方法发送一个done事件,onDone才会被回调
//3.注册监听
streamController.stream.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
通过async*创建
注意:async*
是Dart的关键字,表明该函数返回的是stream,yield
是返回Iterable的单个数据,而yield*
后面跟stream
asyncGet(31).listen((event) {
print(event);
});
Stream<String> asyncGet(int count) async* {
yield* asyncGetString(count).map((event) => event + 'C');
}
Stream<String> asyncGetString(int count) async* {
for (int i = 0; i < count; i++) {
yield await delayedGet(i);
}
}
Future<String> delayedGet(int i) async {
await Future.delayed(Duration(seconds: 1));
return i.toString() + 'B';
}
参考资料
转载自:https://juejin.cn/post/7071160761569148959