likes
comments
collection
share

Flutter Stream

作者站长头像
站长
· 阅读数 55

Stream是什么

Stream 是一系列异步事件的序列。其类似于一个异步的 Iterable,不同的是当你向 Iterable 获取下一个事件时它会立即给你,但是 Stream 则不会立即给你而是在它准备好时告诉你。

单个0个或多个
SyncintInterable<int>
AsyncFuture<int>Stream<int>

Stream对象

  • StreamController:用于整个Stream过程的控制,提供各类接口用于创建各种事件流。
  • StreamSink:事件的入口,add,addStream等。
  • Stream:事件源本身,一般可用于监听事件或者对事件进行转换,如listenwhere
  • StreamSubscription:事件订阅后的对象,表面上用于管理订阅过各类操作,如cancelpause,同时在内部也是事件的中转关键。

关系

有一个事件源叫Stream,为了方便控制Stream,官方提供了StreamController作为管理;同时它对外提供了StreamSink对象作为事件入口;又为Stream属性提供了监听和变换,最后得到StreamSubscription可以管理事件的订阅。

Flutter Stream

Stream工作原理

  1. Streamlisten的时候传入onData回调,这个回调会传入到StreamSubscription中,之后通过zone.registerUnaryCallback注册得到_onData对象
  2. StreamSink在添加事件时,会执行到StreamSubscription中的_sendData方法,然后通过_zone.runUnaryGuarded(_onData, data)执行1中得到的_onData对象,触发listen时传入的回调方法
//1.listen传入onData回调到StreamSubscription中
StreamSubscription<T> listen(void onData(T data)?,
    {Function? onError, void onDone()?, bool? cancelOnError}) {
  cancelOnError ??= false;
  StreamSubscription<T> subscription =
      _createSubscription(onData, onError, onDone, cancelOnError);
  _onListen(subscription);
  return subscription;
}

//为节省篇幅,已省略部分代码
//在此,已经获取到_onData函数对象
_onData = _registerDataHandler<T>(_zone, onData),
      
//把onData传入进行注册
static void Function(T) _registerDataHandler<T>(
    Zone zone, void Function(T)? handleData) {
  return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler);
}
复制代码
//2.sink添加事件StreamSubscription._sendData,然后调用_zone.runUnaryGuarded(_onData, data),
/* _EventDispatch interface. */

void _sendData(T data) {
  assert(!_isCanceled);
  assert(!_isPaused);
  assert(!_inCallback);
  bool wasInputPaused = _isInputPaused;
  _state |= _STATE_IN_CALLBACK;
  _zone.runUnaryGuarded(_onData, data);
  _state &= ~_STATE_IN_CALLBACK;
  _checkState(wasInputPaused);
}

值得注意的是,Stream是在microtask中被调度的

Stream支持同步、异步(默认异步),广播与非广播

Stream支持以下方法:

  • map():将此stream的每个元素转换为一个新的stream事件。
  • where():通过当前stream创建一个新的stream并根据条件丢弃某些元素
  • distinct():如果数据事件与前一个数据事件相等,则跳过数据事件。
  • ....

Stream的使用

通过Stream构造器创建

  • Stream.fromFuture:通过Future创建一个单一订阅stream
  • Stream.fromIterable:通过Iterable的数据创建一个单一订阅的stream
  • Stream.fromFutures:通过一组Future创建单一个单一订阅流的stream
  • Stream.periodic:通过时段创建一个重复发出事件的stream

通过StreamController创建

import 'dart:async';

void main() {
  //1.创建一个任意类型StreamController对象
  StreamController streamController = StreamController(
      onListen: () => print('listen'),
      onCancel: () => print('cancel'),
      onPause: () => print('pause'),
      onResume: () => print('resumr'));
  //2.通过sink槽口添加任意类型事件数据
  streamController.sink.add(100);
  streamController.sink.add(100.121212);
  streamController.sink.add('THIS IS STRING');
  streamController.sink.close();//只有手动调用close方法发送一个done事件,onDone才会被回调
  //3.注册监听
  streamController.stream.listen((event) => print(event),
      onDone: () => print('is done'),
      onError: (error, stacktrace) => print('is error, errMsg: $error'),
      cancelOnError: true);
}

通过async*创建

注意:async*是Dart的关键字,表明该函数返回的是stream,yield是返回Iterable的单个数据,而yield*后面跟stream


asyncGet(31).listen((event) {
  print(event);
});

Stream<String> asyncGet(int count) async* {
  yield* asyncGetString(count).map((event) => event + 'C');
}

Stream<String> asyncGetString(int count) async* {
  for (int i = 0; i < count; i++) {
    yield await delayedGet(i);
  }
}

Future<String> delayedGet(int i) async {
  await Future.delayed(Duration(seconds: 1));
  return i.toString() + 'B';
}

参考资料

转载自:https://juejin.cn/post/7071160761569148959
评论
请登录