【Flutter 异步编程 - 叁】 | 初步认识 Stream 类的使用
一、分析 Stream 对象
要了解一个事物,最好去思考它存在的 价值 。当你可以意识到某个事物的作用,缺少它会有什么弊端,自然会有兴趣去了解它。而不是稀里糊涂的看别人怎么用,自己死记硬背 API 有哪些,分别表示什么意思。一味的堆砌知识点,这样无论学什么都是流于表面,不得要领。
1. Stream 存在的必要性
可能很多朋友都没有在开发中使用过 Stream 对象,知道它挺重要,但又不知道他的具体的用途。有种只可远观,不可亵玩的距离感。Stream 可以弥补 Future 的短板,它对于异步来说是一块很重要的版块。
一个 Future 对象诞生的那一刻,无论成败,它最终注定只有一个结果。就像一个普通的网络接口,一次请求只会有一个响应结果。应用开发在绝大多数场景是一个 因,对应一个 果,所以和 Future 打交道比较多。

但有些场景,任务无法一次完成,对于 一次 请求,会有 若干次 响应。比如现实生活中,你追更一部小说,在你订阅后,作者每次新时,都会通知你。在这个场景下,小说完结代表任务结束,期间会触发多次响应通知,这是 Future 无法处理的。
另外,事件通知的时间不确定的,作者创作的过程也是非常耗时的,所以机体没有必要处于同步等待 的阻塞状态。像这种 异步事件序列 被形象的称之为 Stream 流 。

在人类科学中,一件重要事物的存在,必然有其发挥效用的场所,在这片领域之下,它是所向披靡的王。在接触新知识、新概念时,感知这片领域非常重要,一个工具只有在合适的场景下,才能发挥最大的效力。
2.从读取文件认识 Stream 的使用
File 对象可以通过 readAsString 异步方法读取文件内容,返回 Future<String> 类型对象。而 Future 异步任务只有一次响应机会,通过 then 回调,所以该方法会将文件中的 所有字符 读取出来。
---->[File#readAsString]---
Future<String> readAsString({Encoding encoding = utf8});
但有些场景中没有必要 或 不能 全部读取。比如,想要在一个大文件中寻找一些字符,找到后就 停止读取 ;想要在读取文件时 显示 读取进度。这时,只能响应一次事件的 Future 就爱莫能助了,而这正是 Stream 大显身手的领域。在 File 类中有 openRead 方法返回 Stream 对象,我们先通过这个方法了解一下 Stream 的使用方式。
Stream<List<int>> openRead([int? start, int? end]);
现在的场景和上面 追更小说 是很相似的:
小说作者无需一次性向读者提供所有的章节;小说是一章章进行更新的,每次更新章节,都需要通知读者进行阅读。操作系统不用一次性读取全部文件内容,返回给请求的机体;文件是一块块进行读取的,每块文件读取完,需要通知机体进行处理。

在对 Stream 的理解中,需要认清两个角色: 发布者 和 订阅者 。其中发布者是真正处理任务的机体,是结果的生产者,比如 作者、操作系统、服务器 等,它们有 发送通知 的义务。订阅者是发送请求的机体,对于异步任务,其本身并不参与到执行过程中,可以监听通知来获取需要的结果数据。
代码处理中 Stream 对象使用 listen 方法 监听通知 ,该方法的第一入参是回调函数,每次通知时都会被触发。回调函数的参数类型是 Stream 的泛型,表示此次通知时携带的结果数据。
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
如下是通过 Stream 事件读取文件,显示读取进度的处理逻辑。当 openRead 任务分发之后,操作系统会一块一块地对文件进行读取,每读一块会发送通知。Dart 代码中通过 _onData 函数进行监听,回调的 bytes 就是读取的字节数组结果。

在 _onData 函数中根据每次回调的字节数,就可以很轻松地计算出读取的进度。 onDone 指定的函数,会在任务完成时被触发,任务完成也就表示不会再有事件通知了。
void readFile() async {
File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
print("开始读取 Jane Eyre.txt ");
fileLength = await file.length();
Stream<List<int>> stream = file.openRead();
stream.listen(_onData,onDone: _onDone);
}
void _onData(List<int> bytes) {
counter += bytes.length;
double progress = counter * 100 / fileLength;
DateTime time = DateTime.now();
String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
}
void _onDone() {
print("读取 Jane Eyre.txt 结束");
}
3.初步认识 StreamSubscription
Stream#listen 方法监听后,会返回一个 StreamSubscription 对象,表示此次对流的订阅。
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
通过这个订阅对象,可以暂停 pause 或恢复 resume 对流的监听,以及通过 cancel 取消对流的监听。
---->[StreamSubscription]----
void pause([Future<void>? resumeSignal]);
void resume();
Future<void> cancel();
比如下面当进度大于 50 时,取消对流的订阅:通过打印日志可以看出 54.99% 时,订阅取消,流也随之停止,可以注意一个细节。此时 onDone 回调并未触发,表示当 Stream 任务被取消订阅时,不能算作完成。

late StreamSubscription<List<int>> subscription;
void readFile() async {
File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
print("开始读取 Jane Eyre.txt ");
fileLength = await file.length();
Stream<List<int>> stream = file.openRead();
// listen 方法返回 StreamSubscription 对象
subscription = stream.listen(_onData,onDone: _onDone);
}
void _onData(List<int> bytes) async{
counter += bytes.length;
double progress = counter * 100 / fileLength;
DateTime time = DateTime.now();
String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
if(progress > 50){
subscription.cancel(); // 取消订阅
}
}
二、结合应用理解 Stream 的使用
单看 Dart 代码在控制台打印,实在有些不过瘾。下面通过一个有趣的小例子,介绍 Stream 在 Flutter 项目中的使用。这样可以更形象地认识 Stream 的用途,便于进一步理解。
1. 场景分析
现实生活中如果细心观察,会发现很多 Stream 概念的身影。比如在银行办理业务时,客户可以看作 Stream 中的一个元素,广播依次播报牌号,业务员需要对某个元素进行处理。在餐馆中,每桌的客人可以看作 Stream 中的一个元素,客人下单完成,厨师根据请求准备饭菜进行处理。这里,通过模拟 红绿灯 的状态变化,来说明 Stream 的使用。

可以想象,在一个时间轴上,信号灯的变化是一个连续不断的事件。我们可以将每次的变化视为 Stream 中的一个元素,信号灯每秒的状态信息都会不同。也就是说,这个 Stream 每秒会产出一个状态,要在应用中模拟红绿灯,只需要监听每次的通知,更新界面显示即可。

这里将信号灯的状态信息通过 SignalState 类来封装,成员变量有当前秒数 counter 和信号灯类型 type 。 其中信号灯类型通过 SignalType 枚举表示,有如下三种类型:
const int _kAllowMaxCount = 10;
const int _kWaitMaxCount = 3;
const int _kDenialMaxCount = 10;
class SignalState {
final int counter;
final SignalType type;
SignalState({
required this.counter,
required this.type,
});
}
enum SignalType {
allow, // 允许 - 绿灯
denial, // 拒绝 - 红灯
wait, // 等待 - 黄灯
}
2. 信号灯组件的构建
如下所示,信号灯由三个 Lamp 组件和数字构成。三个灯分别表示 红、黄、绿 ,某一时刻只会量一盏,不亮的使用灰色示意。三个灯水平排列,有一个黑色背景装饰,和文字呈上下结构。

先看灯 Lamp 组件的构建:逻辑非常简单,使用 Container 组件显示圆形,构造时可指定颜色值,为 null 时显示灰色。
class Lamp extends StatelessWidget {
final Color? color;
const Lamp({Key? key, required this.color}) : super(key: key);
@override
Widget build(BuildContext context) {
return Container(
width: 40,
height: 40,
decoration: BoxDecoration(
color: color ?? Colors.grey.withOpacity(0.8),
shape: BoxShape.circle,
),
);
}
}
如下是 SignalLamp 组件的展示效果,其依赖于 SignalState 对象进行显示。根据 SignalType 确定显示的颜色和需要点亮的灯,状态中的 counter 成员用于展示数字。

class SignalLamp extends StatelessWidget {
final SignalState state;
const SignalLamp({Key? key, required this.state}) : super(key: key);
Color get activeColor {
switch (state.type) {
case SignalType.allow:
return Colors.green;
case SignalType.denial:
return Colors.red;
case SignalType.wait:
return Colors.amber;
}
}
@override
Widget build(BuildContext context) {
return Column(
children: [
Container(
padding: const EdgeInsets.symmetric(horizontal: 15, vertical: 10),
decoration: BoxDecoration(
color: Colors.black, borderRadius: BorderRadius.circular(30),),
child: Wrap(
alignment: WrapAlignment.center,
crossAxisAlignment: WrapCrossAlignment.center,
spacing: 15,
children: [
Lamp(color: state.type == SignalType.denial ? activeColor : null),
Lamp(color: state.type == SignalType.wait ? activeColor : null),
Lamp(color: state.type == SignalType.allow ? activeColor : null),
],
),
),
Text(
state.counter.toString(),
style: TextStyle(
fontWeight: FontWeight.bold, fontSize: 50, color: activeColor,
),
)
],
);
}
}
4. Stream 事件的添加与监听
这样,指定不同的 SignalState 就会呈现相应的效果,如下是黄灯的 2 s:
SignalLamp(
state: SignalState(counter: 2, type: SignalType.wait),
)

在使用 Stream 触发更新之前,先说一下思路。Stream 可以监听一系列事件的触发,每次监听会获取新的信号状态,根据新状态渲染界面即可。如下在 SignalState 中定义 next 方法,便于产出下一状态。逻辑很简单,如果数值大于一,类型不变,数值减一,比如 红灯 6 的下一状态是 红灯 5 ; 如果数值等于一,会进入下一类型的最大数值,比如 红灯 1 的下一状态是 黄灯 3 。
---->[SignalState]----
SignalState next() {
if (counter > 1) {
return SignalState(type: type, counter: counter - 1);
} else {
switch (type) {
case SignalType.allow:
return SignalState(
type: SignalType.denial, counter: _kDenialMaxCount);
case SignalType.denial:
return SignalState(type: SignalType.wait, counter: _kWaitMaxCount);
case SignalType.wait:
return SignalState(type: SignalType.allow, counter: _kAllowMaxCount);
}
}
}
把每个事件通知看做元素,Stream 应用处理事件序列,只不过序列中的元素在此刻是未知的,何时触发也是不定的。Stream 基于 发布-订阅 的思想通过监听来处理这些事件。 其中两个非常重要的角色: 发布者 是元素的生产者,订阅者 是元素的消费者。
在引擎中的 async 包中封装了 StreamController 类用于控制元素的添加操作,同时提供 Stream 对象用于监听。代码处理如下,tag1 处,监听 streamController 的 stream 对象。事件到来时触发 emit 方法 ( 方法名任意 ),在 emit 中会回调出 SignalState 对象,根据这个新状态更新界面即可。然后延迟 1s 继续添加下一状态。
---->[_MyHomePageState]----
final StreamController<SignalState> streamController = StreamController();
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);
@override
void initState() {
super.initState();
streamController.stream.listen(emit); // tag1
streamController.add(_signalState);
}
@override
void dispose() {
super.dispose();
streamController.close();
}
void emit(SignalState state) async {
_signalState = state;
setState(() {});
await Future.delayed(const Duration(seconds: 1));
streamController.add(state.next());
}
这样 streamController 添加元素,作为 发布者;添加的元素可以通过 StreamController 的 stream 成员进行监听。
5. Stream 的控制与异常监听
在前面介绍过 Stream#listen 方法会返回一个 StreamSubscription 的订阅对象,通过该对象可以暂停、恢复、取消对流的监听。如下所示,通过点击按钮执行 _toggle 方法,可以达到 暂停/恢复 切换的效果:

---->[_MyHomePageState]----
late StreamSubscription<SignalState> _subscription;
@override
void initState() {
super.initState();
_subscription = streamController.stream.listen(emit);
streamController.add(_signalState);
}
void _toggle() {
if(_subscription.isPaused){
_subscription.resume();
}else{
_subscription.pause();
}
setState(() {});
}
另外,StreamController 在构造时可以传入四个函数来监听流的状态:

final StreamController<SignalState> streamController = StreamController(
onListen: ()=> print("=====onListen====="),
onPause: ()=> print("=====onPause====="),
onResume: ()=> print("=====onResume====="),
onCancel: ()=> print("=====onCancel====="),
);
onListen 会在 stream 成员被监听时触发一次;onPause、onResume 、onCancel 分别对应订阅者的 pause 、 resume 、cancel 方法。如下是点击暂停和恢复的日志信息:

在 Stream#listen 方法中还有另外两个可选参数用于异常的处理。 onError 是错误的回调函数,cancelOnError 标识用于控制触发异常时,是否取消 Stream 。
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
如下所示,在 emit 中故意在 红 7 时通过 addError 添加一个异常元素。这里界面简单显示错误信息,在 3 s 后异常被修复,继续添加新元素。

void emit(SignalState state) async {
_signalState = state;
setState(() {});
await Future.delayed(const Duration(seconds: 1));
SignalState nextState = state.next();
if (nextState.counter == 7 && nextState.type == SignalType.denial) {
streamController.addError(Exception('Error Signal State'));
} else {
streamController.add(nextState);
}
}
在 listen 方法中使用 onError 监听异常事件,进行处理:其中逻辑是渲染错误界面,三秒后修复异常,继续产出下一状态:
_subscription = streamController.stream.listen(
emit,
onError: (err) async {
print(err);
renderError();
await Future.delayed(const Duration(seconds: 3));
fixError();
emit(_signalState.next());
},
cancelOnError: false,
);
关于异常的处理,这里简单地提供 hasError 标识进行构建逻辑的区分:

bool hasError = false;
void renderError(){
hasError = true;
setState(() {});
}
void fixError(){
hasError = false;
}
最后说一下 listen 中 cancelOnError 的作用,它默认是 false 。如果 cancelOnError = true ,在监听到异常之后,就会取消监听 stream ,也就是说之后控制器添加的元素就会监听了。这样异常时 StreamController 会触发 onCancel 回调:

三、异步生成器函数与 Stream
前面介绍了通过 StreamController 获取 Stream 进行处理的方式,下面再来看另一种获取 Stream 的方式 - 异步生成器函数 。
1. 思考 Stream 与 Iterable
通过前面对 Stream 的认识,我们知道它是在 时间线 上可拥有若干个可监听的事件元素。而 Iterable 也可以拥有多个元素,两者之间是有很大差距的。Iterable 在 时间 和 空间 上都对元素保持持有关系;而 Stream 只是在时间上监听若干元素的到来,并不在任意时刻都持有元素,更不会在空间上保持持有关系。
对于一个 Type 类型的数据,在异步任务中,Stream<T> 是 Future<T> 就是多值和单值的区别,它们的结果都不能在 当前时刻 得到,只能通过监听在 未来 得到值。 与之相对的就是 Iterable<Type> 和 Type ,它们代表此时此刻,实实在在的对象,可以随时使用。
| 单值 | 多值 | |
|---|---|---|
| 同步 | Type | Iterable<Type> |
| 异步 | Future<Type> | Stream<Type> |
2. 通过异步生成器函数获取 Stream 对象
Future 对象可以通过 async/awiat 关键字,简化书写,更方便的获取异步任务结果。 对于 Stream 也有类似的 async*/yield 关键字。 如下所示, async* 修饰的方法需要返回一个 Stream 对象。
在方法体中通过 yield 关键字 产出 泛型结果对象,如下是对 信号状态流 元素产生出的逻辑:遍历 count 次,每隔 1 s 产出一个状态。
class SignalStream{
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);
Stream<SignalState> createStream({int count = 100}) async*{
for(int i = 0 ; i < count; i++){
await Future.delayed(const Duration(seconds: 1));
_signalState = _signalState.next();
yield _signalState;
}
}
}
这样,在 _MyHomePageState 中通过 signalStream.createStream() 就可以创建一个有 100 个元素的流,进行监听。每次接收到新状态时,更新界面,也可以达到目的:
---->[_MyHomePageState]---
final SignalStream signalStream = SignalStream();
_subscription = signalStream.createStream().listen(
emit,
);
void emit(SignalState state) async {
_signalState = state;
setState(() {});
}
到这里,关于 Stream 的初步认识就结束了,当然 Stream 的知识还有很多,在后面会陆续介绍。通过本文,你只需要明白 Stream 是什么,通过它我们能干什么就行了。下一篇我们将分析一下 FutureBuilder 和 StreamBuilder 组件的使用和源码实现。它们是 Flutter 对异步对象的封装组件,通过对它们的认识,也能加深我们对 Future 和 Stream 的立即。 那本文就到这里,谢谢观看 ~
转载自:https://juejin.cn/post/7147881475688366093