likes
comments
collection
share

【Flutter 异步编程 - 叁】 | 初步认识 Stream 类的使用

作者站长头像
站长
· 阅读数 70

一、分析 Stream 对象

要了解一个事物,最好去思考它存在的 价值 。当你可以意识到某个事物的作用,缺少它会有什么弊端,自然会有兴趣去了解它。而不是稀里糊涂的看别人怎么用,自己死记硬背 API 有哪些,分别表示什么意思。一味的堆砌知识点,这样无论学什么都是流于表面,不得要领。


1. Stream 存在的必要性

可能很多朋友都没有在开发中使用过 Stream 对象,知道它挺重要,但又不知道他的具体的用途。有种只可远观,不可亵玩的距离感。Stream 可以弥补 Future 的短板,它对于异步来说是一块很重要的版块。

一个 Future 对象诞生的那一刻,无论成败,它最终注定只有一个结果。就像一个普通的网络接口,一次请求只会有一个响应结果。应用开发在绝大多数场景是一个 ,对应一个 ,所以和 Future 打交道比较多。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

但有些场景,任务无法一次完成,对于 一次 请求,会有 若干次 响应。比如现实生活中,你追更一部小说,在你订阅后,作者每次新时,都会通知你。在这个场景下,小说完结代表任务结束,期间会触发多次响应通知,这是 Future 无法处理的。

另外,事件通知的时间不确定的,作者创作的过程也是非常耗时的,所以机体没有必要处于同步等待 的阻塞状态。像这种 异步事件序列 被形象的称之为 Stream 流

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

在人类科学中,一件重要事物的存在,必然有其发挥效用的场所,在这片领域之下,它是所向披靡的王。在接触新知识、新概念时,感知这片领域非常重要,一个工具只有在合适的场景下,才能发挥最大的效力。


2.从读取文件认识 Stream 的使用

File 对象可以通过 readAsString 异步方法读取文件内容,返回 Future<String> 类型对象。而 Future 异步任务只有一次响应机会,通过 then 回调,所以该方法会将文件中的 所有字符 读取出来。

---->[File#readAsString]---
Future<String> readAsString({Encoding encoding = utf8});

但有些场景中没有必要不能 全部读取。比如,想要在一个大文件中寻找一些字符,找到后就 停止读取 ;想要在读取文件时 显示 读取进度。这时,只能响应一次事件的 Future 就爱莫能助了,而这正是 Stream 大显身手的领域。在 File 类中有 openRead 方法返回 Stream 对象,我们先通过这个方法了解一下 Stream 的使用方式。

Stream<List<int>> openRead([int? start, int? end]);

现在的场景和上面 追更小说 是很相似的:

  • 小说作者 无需一次性向 读者 提供所有的章节;小说是 一章章 进行更新的,每次更新章节,都需要 通知读者 进行阅读。
  • 操作系统 不用一次性读取全部文件内容,返回给请求的 机体;文件是 一块块 进行读取的,每块文件读取完,需要 通知机体 进行处理。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

在对 Stream 的理解中,需要认清两个角色: 发布者订阅者 。其中发布者是真正处理任务的机体,是结果的生产者,比如 作者操作系统服务器 等,它们有 发送通知 的义务。订阅者是发送请求的机体,对于异步任务,其本身并不参与到执行过程中,可以监听通知来获取需要的结果数据。

代码处理中 Stream 对象使用 listen 方法 监听通知 ,该方法的第一入参是回调函数,每次通知时都会被触发。回调函数的参数类型是 Stream 的泛型,表示此次通知时携带的结果数据。

StreamSubscription<T> listen(void onData(T event)?,
    {Function? onError, void onDone()?, bool? cancelOnError});

如下是通过 Stream 事件读取文件,显示读取进度的处理逻辑。当 openRead 任务分发之后,操作系统会一块一块地对文件进行读取,每读一块会发送通知。Dart 代码中通过 _onData 函数进行监听,回调的 bytes 就是读取的字节数组结果。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

_onData 函数中根据每次回调的字节数,就可以很轻松地计算出读取的进度。 onDone 指定的函数,会在任务完成时被触发,任务完成也就表示不会再有事件通知了。

void readFile() async {
  File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
  print("开始读取 Jane Eyre.txt ");
  fileLength = await file.length();
  Stream<List<int>> stream = file.openRead();
  stream.listen(_onData,onDone: _onDone);
}

void _onData(List<int> bytes) {
  counter += bytes.length;
  double progress = counter * 100 / fileLength;
  DateTime time = DateTime.now();
  String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
  print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
}

void _onDone() {
  print("读取 Jane Eyre.txt 结束");
}

3.初步认识 StreamSubscription

Stream#listen 方法监听后,会返回一个 StreamSubscription 对象,表示此次对流的订阅。

StreamSubscription<T> listen(void onData(T event)?,
    {Function? onError, void onDone()?, bool? cancelOnError});

通过这个订阅对象,可以暂停 pause 或恢复 resume 对流的监听,以及通过 cancel 取消对流的监听。

---->[StreamSubscription]----
void pause([Future<void>? resumeSignal]);
void resume();
Future<void> cancel();

比如下面当进度大于 50 时,取消对流的订阅:通过打印日志可以看出 54.99% 时,订阅取消,流也随之停止,可以注意一个细节。此时 onDone 回调并未触发,表示当 Stream 任务被取消订阅时,不能算作完成。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

late StreamSubscription<List<int>> subscription;

void readFile() async {
  File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
  print("开始读取 Jane Eyre.txt ");
  fileLength = await file.length();
  Stream<List<int>> stream = file.openRead();
  // listen 方法返回 StreamSubscription 对象
  subscription = stream.listen(_onData,onDone: _onDone);
}


void _onData(List<int> bytes) async{
  counter += bytes.length;
  double progress = counter * 100 / fileLength;
  DateTime time = DateTime.now();
  String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
  print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
  if(progress > 50){
    subscription.cancel(); // 取消订阅
  }
}

二、结合应用理解 Stream 的使用

单看 Dart 代码在控制台打印,实在有些不过瘾。下面通过一个有趣的小例子,介绍 StreamFlutter 项目中的使用。这样可以更形象地认识 Stream 的用途,便于进一步理解。


1. 场景分析

现实生活中如果细心观察,会发现很多 Stream 概念的身影。比如在银行办理业务时,客户可以看作 Stream 中的一个元素,广播依次播报牌号,业务员需要对某个元素进行处理。在餐馆中,每桌的客人可以看作 Stream 中的一个元素,客人下单完成,厨师根据请求准备饭菜进行处理。这里,通过模拟 红绿灯 的状态变化,来说明 Stream 的使用。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

可以想象,在一个时间轴上,信号灯的变化是一个连续不断的事件。我们可以将每次的变化视为 Stream 中的一个元素,信号灯每秒的状态信息都会不同。也就是说,这个 Stream 每秒会产出一个状态,要在应用中模拟红绿灯,只需要监听每次的通知,更新界面显示即可。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

这里将信号灯的状态信息通过 SignalState 类来封装,成员变量有当前秒数 counter 和信号灯类型 type 。 其中信号灯类型通过 SignalType 枚举表示,有如下三种类型:

const int _kAllowMaxCount = 10;
const int _kWaitMaxCount = 3;
const int _kDenialMaxCount = 10;

class SignalState {
  final int counter;
  final SignalType type;

  SignalState({
    required this.counter,
    required this.type,
  });
}

enum SignalType {
  allow, // 允许 - 绿灯
  denial, // 拒绝 - 红灯
  wait, // 等待 - 黄灯
}

2. 信号灯组件的构建

如下所示,信号灯由三个 Lamp 组件和数字构成。三个灯分别表示 红、黄、绿 ,某一时刻只会量一盏,不亮的使用灰色示意。三个灯水平排列,有一个黑色背景装饰,和文字呈上下结构。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用


先看灯 Lamp 组件的构建:逻辑非常简单,使用 Container 组件显示圆形,构造时可指定颜色值,为 null 时显示灰色。

class Lamp extends StatelessWidget {
  final Color? color;

  const Lamp({Key? key, required this.color}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    return Container(
      width: 40,
      height: 40,
      decoration: BoxDecoration(
        color: color ?? Colors.grey.withOpacity(0.8),
        shape: BoxShape.circle,
      ),
    );
  }
}

如下是 SignalLamp 组件的展示效果,其依赖于 SignalState 对象进行显示。根据 SignalType 确定显示的颜色和需要点亮的灯,状态中的 counter 成员用于展示数字。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

class SignalLamp extends StatelessWidget {
  final SignalState state;

  const SignalLamp({Key? key, required this.state}) : super(key: key);

  Color get activeColor {
    switch (state.type) {
      case SignalType.allow:
        return Colors.green;
      case SignalType.denial:
        return Colors.red;
      case SignalType.wait:
        return Colors.amber;
    }
  }

  @override
  Widget build(BuildContext context) {
    return Column(
      children: [
        Container(
          padding: const EdgeInsets.symmetric(horizontal: 15, vertical: 10),
          decoration: BoxDecoration(
              color: Colors.black, borderRadius: BorderRadius.circular(30),),
          child: Wrap(
            alignment: WrapAlignment.center,
            crossAxisAlignment: WrapCrossAlignment.center,
            spacing: 15,
            children: [
              Lamp(color: state.type == SignalType.denial ? activeColor : null),
              Lamp(color: state.type == SignalType.wait ? activeColor : null),
              Lamp(color: state.type == SignalType.allow ? activeColor : null),
            ],
          ),
        ),
        Text(
          state.counter.toString(),
          style: TextStyle(
              fontWeight: FontWeight.bold, fontSize: 50, color: activeColor,
          ),
        )
      ],
    );
  }
}

4. Stream 事件的添加与监听

这样,指定不同的 SignalState 就会呈现相应的效果,如下是黄灯的 2 s

SignalLamp(
  state: SignalState(counter: 2, type: SignalType.wait),
)

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

在使用 Stream 触发更新之前,先说一下思路。Stream 可以监听一系列事件的触发,每次监听会获取新的信号状态,根据新状态渲染界面即可。如下在 SignalState 中定义 next 方法,便于产出下一状态。逻辑很简单,如果数值大于一,类型不变,数值减一,比如 红灯 6 的下一状态是 红灯 5 ; 如果数值等于一,会进入下一类型的最大数值,比如 红灯 1 的下一状态是 黄灯 3

---->[SignalState]----
SignalState next() {
  if (counter > 1) {
    return SignalState(type: type, counter: counter - 1);
  } else {
    switch (type) {
      case SignalType.allow:
        return SignalState(
            type: SignalType.denial, counter: _kDenialMaxCount);
      case SignalType.denial:
        return SignalState(type: SignalType.wait, counter: _kWaitMaxCount);
      case SignalType.wait:
        return SignalState(type: SignalType.allow, counter: _kAllowMaxCount);
    }
  }
}

把每个事件通知看做元素,Stream 应用处理事件序列,只不过序列中的元素在此刻是未知的,何时触发也是不定的。Stream 基于 发布-订阅 的思想通过监听来处理这些事件。 其中两个非常重要的角色: 发布者 是元素的生产者,订阅者 是元素的消费者。

在引擎中的 async 包中封装了 StreamController 类用于控制元素的添加操作,同时提供 Stream 对象用于监听。代码处理如下,tag1 处,监听 streamControllerstream 对象。事件到来时触发 emit 方法 ( 方法名任意 ),在 emit 中会回调出 SignalState 对象,根据这个新状态更新界面即可。然后延迟 1s 继续添加下一状态。

---->[_MyHomePageState]----
final StreamController<SignalState> streamController = StreamController();
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);

@override
void initState() {
  super.initState();
  streamController.stream.listen(emit); // tag1
  streamController.add(_signalState);
}

@override
void dispose() {
  super.dispose();
  streamController.close();
}

void emit(SignalState state) async {
  _signalState = state;
  setState(() {});
  await Future.delayed(const Duration(seconds: 1));
  streamController.add(state.next());
}

这样 streamController 添加元素,作为 发布者;添加的元素可以通过 StreamControllerstream 成员进行监听。


5. Stream 的控制与异常监听

在前面介绍过 Stream#listen 方法会返回一个 StreamSubscription 的订阅对象,通过该对象可以暂停、恢复、取消对流的监听。如下所示,通过点击按钮执行 _toggle 方法,可以达到 暂停/恢复 切换的效果:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

---->[_MyHomePageState]----
late StreamSubscription<SignalState> _subscription;

@override
void initState() {
  super.initState();
  _subscription = streamController.stream.listen(emit);
  streamController.add(_signalState);
}

void _toggle() {
  if(_subscription.isPaused){
    _subscription.resume();
  }else{
    _subscription.pause();
  }
  setState(() {});
}

另外,StreamController 在构造时可以传入四个函数来监听流的状态:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

final StreamController<SignalState> streamController = StreamController(
  onListen: ()=> print("=====onListen====="),
  onPause: ()=> print("=====onPause====="),
  onResume: ()=> print("=====onResume====="),
  onCancel: ()=> print("=====onCancel====="),
);

onListen 会在 stream 成员被监听时触发一次;onPauseonResumeonCancel 分别对应订阅者的 pauseresumecancel 方法。如下是点击暂停和恢复的日志信息:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用


Stream#listen 方法中还有另外两个可选参数用于异常的处理。 onError 是错误的回调函数,cancelOnError 标识用于控制触发异常时,是否取消 Stream

StreamSubscription<T> listen(void onData(T event)?,
    {Function? onError, void onDone()?, bool? cancelOnError});

如下所示,在 emit 中故意在 红 7 时通过 addError 添加一个异常元素。这里界面简单显示错误信息,在 3 s 后异常被修复,继续添加新元素。

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

void emit(SignalState state) async {
  _signalState = state;
  setState(() {});
  await Future.delayed(const Duration(seconds: 1));
  SignalState nextState = state.next();
  if (nextState.counter == 7 && nextState.type == SignalType.denial) {
    streamController.addError(Exception('Error Signal State'));
  } else {
    streamController.add(nextState);
  }
}

listen 方法中使用 onError 监听异常事件,进行处理:其中逻辑是渲染错误界面,三秒后修复异常,继续产出下一状态:

_subscription = streamController.stream.listen(
  emit,
  onError: (err) async {
    print(err);
    renderError();
    await Future.delayed(const Duration(seconds: 3));
    fixError();
    emit(_signalState.next());
  },
  cancelOnError: false,
);

关于异常的处理,这里简单地提供 hasError 标识进行构建逻辑的区分:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用

bool hasError = false;

void renderError(){
  hasError = true;
  setState(() {});
}
void fixError(){
  hasError = false;
}

最后说一下 listencancelOnError 的作用,它默认是 false 。如果 cancelOnError = true ,在监听到异常之后,就会取消监听 stream ,也就是说之后控制器添加的元素就会监听了。这样异常时 StreamController 会触发 onCancel 回调:

【Flutter 异步编程 - 叁】 |  初步认识 Stream 类的使用


三、异步生成器函数与 Stream

前面介绍了通过 StreamController 获取 Stream 进行处理的方式,下面再来看另一种获取 Stream 的方式 - 异步生成器函数


1. 思考 Stream 与 Iterable

通过前面对 Stream 的认识,我们知道它是在 时间线 上可拥有若干个可监听的事件元素。而 Iterable 也可以拥有多个元素,两者之间是有很大差距的。Iterable时间空间 上都对元素保持持有关系;而 Stream 只是在时间上监听若干元素的到来,并不在任意时刻都持有元素,更不会在空间上保持持有关系。

对于一个 Type 类型的数据,在异步任务中,Stream<T>Future<T> 就是多值和单值的区别,它们的结果都不能在 当前时刻 得到,只能通过监听在 未来 得到值。 与之相对的就是 Iterable<Type>Type ,它们代表此时此刻,实实在在的对象,可以随时使用。

单值多值
同步TypeIterable<Type>
异步Future<Type>Stream<Type>

2. 通过异步生成器函数获取 Stream 对象

Future 对象可以通过 async/awiat 关键字,简化书写,更方便的获取异步任务结果。 对于 Stream 也有类似的 async*/yield 关键字。 如下所示, async* 修饰的方法需要返回一个 Stream 对象。

在方法体中通过 yield 关键字 产出 泛型结果对象,如下是对 信号状态流 元素产生出的逻辑:遍历 count 次,每隔 1 s 产出一个状态。

class SignalStream{
  SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);
  
  Stream<SignalState> createStream({int count = 100}) async*{
    for(int i = 0 ; i < count; i++){
      await Future.delayed(const Duration(seconds: 1));
      _signalState = _signalState.next();
      yield _signalState;
    }
  }
}

这样,在 _MyHomePageState 中通过 signalStream.createStream() 就可以创建一个有 100 个元素的流,进行监听。每次接收到新状态时,更新界面,也可以达到目的:

---->[_MyHomePageState]---
final SignalStream signalStream = SignalStream();

_subscription = signalStream.createStream().listen(
  emit,
);

void emit(SignalState state) async {
  _signalState = state;
  setState(() {});
}

到这里,关于 Stream 的初步认识就结束了,当然 Stream 的知识还有很多,在后面会陆续介绍。通过本文,你只需要明白 Stream 是什么,通过它我们能干什么就行了。下一篇我们将分析一下 FutureBuilderStreamBuilder 组件的使用和源码实现。它们是 Flutter 对异步对象的封装组件,通过对它们的认识,也能加深我们对 FutureStream 的立即。 那本文就到这里,谢谢观看 ~

转载自:https://juejin.cn/post/7147881475688366093
评论
请登录