【Flutter 异步编程 - 玖】 | 学习 Stream 的元素转换操作
0. 前言 - 测试说明
Stream
是一种可连续监听的 事件序列
,它本质上是对 发布-订阅
模式的具体实现,人们习惯于称其为 响应式
模式。很多编程语言都有 Rx[language]
, 它们由 【ReactiveX】 组织负责维护,如下也包含 RxDart
版本,它是对 Stream
的拓展。
Rx
中对元素的操作方式被称为 操作符
,Stream
本身有很多内置操作符,本篇我们学习这些内置的操作。这里测试中使用的流如下所示,通过 StreamProvider#createStream
提供:一共流中有七个 int
元素,每隔 100 ms
产出一个。
class StreamProvider{
Stream<int> createStream() async*{
List<int> res= [1,9,9,4,3,2,8];
for(int i = 0 ; i < res.length; i++){
yield res[i];
await Future.delayed(const Duration(milliseconds: 100));
}
}
}
输入流
测试如下,运行时会每隔 100 ms
打印一个数字:代码见 00_original.dart
void main(){
Stream<int> intStream = StreamProvider().createStream();
intStream.listen((e) {
print(e);
});
}
一、简单的 Stream 之间的转换
首先我们从下面 8
个操作符简单认识一下 Stream
-> Stream
间的转换。这几个方法,都是 Stream
的成员方法,且返回另一个 Stream
对象:
1. map 操作符: 映射转换
Stream<S> map<S>(S convert(T event))
从方法定义上可以看出: map
可以将 T
类型的元素转换成 S
型,返回为一个 S
型的新流。 map
意为 映射
,入参是单参回调,用来决定转换的 映射关系
。如下所示,通过 map
操作,将一个 int
型的 Stream
转换成一个 String
型的 Stream
。
测试代码 01_map.dart 如下,这里的映射关系是:将输入的元素作为 key
, 从 numMap
中取值返回:
Map<int,String> numMap = {
0:"零", 1:"壹", 2:"贰", 3:"叁", 4:"肆",
5:"伍", 6:"陆", 7:"柒", 8:"捌", 9:"玖", 10:"拾",
};
void main(){
Stream<int> intStream = StreamProvider().createStream();
Stream<String> newStream = intStream.map<String>((int e) => numMap[e]!);
newStream.listen((e) {
print(e);
});
}
2. distinct 操作符: 区分过滤
Stream<T> distinct([bool equals(T previous, T next)?])
从方法定义上可以看出: distinct
会生成一个与原流 相同泛型
的新流,入参是两参回调函数,返回 bool
值。该操作符可以根据 当前元素
和 前一元素
比较结果,决定是否将 当前元素
加入输出流。如下示意图,在 200 ms
时,输入流产出的元素是 9
,当把条件设为: 前后元素相同则不产出,就可以把连续相同的元素过滤掉:
测试代码 02_distinct.dart 如下,这里的判断关系是:前后两个元素相同是为一致,不加入新流中。这在状态管理中,可以很方便地实现连续相同的状态不做响应,从而避免无意义的更新。
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.distinct((int a , int b) => a == b);
newStream.listen((e) {
print(e);
});
}
3. where 操作符:条件过滤
Stream<T> where(bool test(T event))
从方法定义上可以看出: where
会生成一个与原流 相同泛型
的新流,入参是一参回调函数,返回 bool
值。该操作符可以对 当前元素
进行校验,决定是否将其加入输出流。如下示意图,判断标准是元素大于 5
才能加入新流,所以输出流只有 9,9,8
三个元素,在其他时段不产出 :
测试代码 03_where.dart 如下,这里的判断关系是:元素 e > 5
时允许加入新流中。通过 where
可以忽略原流中的某些元素,当元素激活时不作响应。
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.where((int e) => e > 5);
newStream.listen((e) {
print(e);
});
}
4. take 操作符:截取
Stream<T> take(int count)
从方法定义上可以看出: take
会生成一个与原流 相同泛型
的新流,入参是 int 型数字。该操作符可以选取前面 count 个元素加入输出流。如下示意图, take(3) 表示只取输入流的前三个。值得注意的是:如果输入流此时未进行监听,输出流达到 count
后,时间线就会结束。也就是说输入流不会继续产出元素。
测试代码 04_take.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.take(3);
newStream.listen((e) {
print(e);
});
}
5. takeWhile 操作符:条件截取
Stream<T> takeWhile(bool test(T element))
从方法定义上可以看出: takeWhile
会生成一个与原流 相同泛型
的新流,入参是一参回调函数,返回 bool
值。该操作符可以 根据条件 截取前面满足条件元素加入输出流,直到出现不满足条件为止。值得注意的是:如果输入流未进行监听,输出流出现不满足条件元素时,时间线就会结束。
测试代码 05_takeWhile.dart 如下:条件是小于 4
,或者等于 9
,所以前面三个元素满足情况,第四个元素不满足,会使流结束:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.takeWhile((int e) => e <= 4 || e == 9);
newStream.listen((e) {
print(e);
});
}
6. skip 操作符:跳过
Stream<T> skip(int count)
从方法定义上可以看出: skip
会生成一个与原流 相同泛型
的新流,入参是 int 型数字。该操作符可以跳过前面 count 个元素,使他们不加入输出流。如下所示,是 skip(3)
的效果。
测试代码 06_skip.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.skip(3);
newStream.listen((e) {
print(e);
});
}
7. skipWhile 操作符:条件跳过
Stream<T> skipWhile(bool test(T element))
从方法定义上可以看出:skipWhile
会生成一个与原流 相同泛型
的新流,入参是一参回调函数,返回 bool
值。该操作符可以 根据条件 跳过前面满足条件元素,使其不加入输出流,直到出现不满足条件为止。
值得注意的是:第一个不满足条件的元素出现后,该条件不会影响后续的元素。比如下面条件是 e < 4
, 第一个元素是 1
,满足条件跳过。第二个元素是 9
不满足,接下来的 3
、2
元素不会受到条件影响。
测试代码 07_skipWhile.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<int> newStream = intStream.skipWhile((int e) => e < 4);
newStream.listen((e) {
print(e);
});
}
8. cast 与 castFrom : 强制类型转换
Stream<R> cast<R>() => Stream.castFrom<T, R>(this);
从方法定义上可以看出: cast
没有参数,允许生成一个与原流 不同泛型
的新流。该方法使用 Stream
的 castFrom
静态方法实现的,本质上是对元素进行 强制类型转换 ,所以需要注意的是:前后类型必须满足转换要求,否则会出现异常,一般使用场景非常有限。
测试代码 08_cast_castFrom.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<num> newStream = intStream.cast<num>();
int start = DateTime.now().millisecondsSinceEpoch;
newStream.listen((e) {
print(e);
});
}
二、较复杂的 Stream 之间的转换
1. expand 操作符:展开
Stream<S> expand<S>(Iterable<S> convert(T element))
从方法定义上可以看出:expand
允许生成一个与原流 不同泛型 S
的新流,入参是一参回调函数,返回 Iterable<S>
值。也就是说,该操作符可以让一个输入流元素,激发出多个其他类型的元素,放入输出流中:比如下面每个元素可以输出两个字符元素信息:
测试代码 09_expend.dart 如下:
Map<int,String> numMap = {
0:"零", 1:"壹", 2:"贰", 3:"叁", 4:"肆",
5:"伍", 6:"陆", 7:"柒", 8:"捌", 9:"玖", 10:"拾",
};
Map<int,String> numMap2 = {
0:"0", 1:"Ⅰ", 2:"Ⅱ", 3:"Ⅲ", 4:"Ⅳ",
5:"Ⅴ", 6:"Ⅵ", 7:"Ⅶ", 8:"Ⅷ", 9:"Ⅸ", 10:"Ⅹ",
};
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<String> newStream = intStream.expand<String>((e) => [numMap[e]!,numMap2[e]!]);
int start = DateTime.now().millisecondsSinceEpoch;
newStream.listen((e) {
print(e);
});
}
2. asyncMap 操作符:异步映射
Stream<E> asyncMap<E>(FutureOr<E> convert(T event))
从方法定义上可以看出: asyncMap
允许生成一个与原流 不同泛型 E
的新流,入参是一参回调函数,返回 FutureOr<E>
类型值。也就是说,该操作符允许通过 异步方法 对流元素进行转换。如下所示, 1
元素激发时,延时 100 ms
模拟异步操作,在操作完成之后才会继续触发延时 100 ms
产出 2
元素,以此类推。
说个实际场景比较任意明白:比如读取文件夹会生成一个 Stream
对象,通过 where
可以过滤出文件元素,在通过 asyncMap
可以直接通过异步读取 File
元素内容,获取一个 Stream<String>
的文件夹内部文件内容流:
测试代码 10_asyncMap.dart 如下:
Map<int,String> numMap = {
0:"零", 1:"壹", 2:"贰", 3:"叁", 4:"肆",
5:"伍", 6:"陆", 7:"柒", 8:"捌", 9:"玖", 10:"拾",
};
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<String> newStream = intStream.asyncMap(delayMap);
int start = DateTime.now().millisecondsSinceEpoch;
newStream.listen((e) {
print(e);
});
}
Future<String> delayMap(int e) async{
// 模拟异步耗时:
await Future.delayed(const Duration(milliseconds: 100));
return numMap[e]!;
}
3. asyncExpand 操作符:异步展开
Stream<E> asyncExpand<E>(Stream<E>? convert(T event))
从方法定义上可以看出: asyncExpand
允许生成一个与原流 不同泛型 E
的新流,入参是一参回调函数,返回 Stream<E>?
类型值。也就是说,该操作符允许会将元素转换成 E 泛型的流 。这可能让人很难理解,不过结合 Expand
可以将元素转化为 Iterable<S>
,那异步情况转换成 Stream
也在情理之中。
如下示例中,每个数字会被转换成含有两个元素的 Stream<String>
,且每个 String
元素间延时 50 ms
模拟异步处理。感觉这个方法一旦派上用场,肯定有大用,我暂时没想出什么应用场景 ~
测试代码 11_asyncExpend.dart 如下:
Map<int,String> numMap = {
0:"零", 1:"壹", 2:"贰", 3:"叁", 4:"肆",
5:"伍", 6:"陆", 7:"柒", 8:"捌", 9:"玖", 10:"拾",
};
Map<int,String> numMap2 = {
0:"0", 1:"Ⅰ", 2:"Ⅱ", 3:"Ⅲ", 4:"Ⅳ",
5:"Ⅴ", 6:"Ⅵ", 7:"Ⅶ", 8:"Ⅷ", 9:"Ⅸ", 10:"Ⅹ",
};
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<String> newStream = intStream.asyncExpand(_streamExpand);
newStream.listen((e) {
print(e);
});
}
Stream<String>? _streamExpand(int e) async*{
await Future.delayed(const Duration(milliseconds: 50));
yield numMap[e]!;
await Future.delayed(const Duration(milliseconds: 50));
yield numMap2[e]!;
}
4. transform 操作符:转化流
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer)
从方法定义上可以看出: transform
允许生成一个与原流 不相同泛型 S
的新流,入参是StreamTransformer<T, S>
类型对象。该方法可以通过自定义 StreamTransformer
来实现流从 T
类型到 S
类型的转化。
Dart
中对 StreamTransformer
的实现比较少,只有第 Ascii
的编解码的流转换器。
如下代码,简单通过 AsciiEncoder 看一下使用方式,代码 12_transform.dart
void main() {
Stream<int> intStream = StreamProvider().createStream();
Stream<List<int>> newStream = intStream
.map((int e) => e.toString())
.transform(const AsciiEncoder());
int start = DateTime.now().millisecondsSinceEpoch;
newStream.listen((e) {
print("$e === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
在 bloc
中对事件的转换时,可以看到 transform
的身影:
另外在 rxdart
库中有大量的 StreamTransformer
的使用,这个有机会再细说。
能通过一个 Stream
生成另一个 Stream
内置的的就只这些方法。另外 stream_transform
库对 Stream
的流转换进行了一些拓展,虽然没有 rxdart
那么全面和系统,但基本可以满足日常需要。作为一个小巧的对 rxdart
的下位替代还是不错的。
三、 产生单元素的操作符
除了转换成其他 Stream
之外,还有一些操作符可以产出一个元素,也就是 Future
对象。这小节来看一下如下的五个操作符:
1. reduce 操作符:迭代合并
Future<T> reduce(T combine(T previous, T element))
从方法定义上可以看出: reduce
返回与原流 相同泛型
的 Future
对象 ,入参是两参回调函数,返回泛型对象。该操作符的特点是:每当非首元素激活时,都会使用 combine
对前面结果 p
和当前元素 e
进行处理。
断点调试一下就会非常清晰,测试代码见 13_reduce.dart。比如这里 combine
操作是前后元素相加,第一次 combine
触发是在 100ms
时,
9
被激活。 此时的 p
是前一元素,也就是 1
, e
是当前值 9
; 返回结果是 p+e = 10
;
在 200 ms
第二个 9
激活时,会触发第二次 combine
,此时的 p
是上一次的结果值 10
, 9
是当前值 9
; 返回结果是 p+e = 19
。以此类推,知道 600ms
时流结束,返回结果 36
。这样就实现了异步的元素累加效果。当然除了加法,你可以提供 combine
函数满足特点的需求。
值得注意的是: reduce 只会返回最终的合并结果元素,也就是 一个 Future 对象,但在元素激发的过程中会不断通过 combine
进行迭代合并结果。
2. fold 操作符:迭代合并转换
Future<S> fold<S>(S initialValue, S combine(S previous, T element))
从方法定义上可以看出: fold
返回与原流 不同泛型
的 Future
对象 ,也就是说,除了 迭代合并
之外,fold
还有 类型转换 的功能。
我们知道 reduce
是在第二个元素激活时,才和第一个元素开始迭代的。fold
有两个入参,第一个是 S
类型的初始值,表示最初迭代的前元素,也就是说在第一个元素激活时就可以使用 combine
来处理。第二个入参是函数对象 combine
,它有将 T
泛型的元素,和结果 S
型元素,组合成新 S
型对象的能力。
测试代码 14_fold.dart 如下:fold
和 reduce
在语义上是 折叠
和 减少
的含义。从图中可以看出,它们都是对 Stream
元素按照规则进行合并操作,从输入和输出来看,是将 多个元素 合并成 一个元素 进行输出,这就是 折叠
和 减少
的形象体现。可以感受到 reduce
相当于一个搭配版的 fold
。
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<String> reduceResult =
intStream.fold<String>("大写:", (String p, int e) => p + numMap[e]!);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
3. drain 操作符: 排干
Future<E> drain<E>([E? futureValue])
从方法定义上可以看出: drain
返回与原流 不同泛型
的 Future
对象 ,其中可以传入 E?
泛型的目标值。 这个方法非常形象,把 Stream
中的元素看作是 水
,drain
的意思是把水排干。该方法的作用就返回 Future
对象作为 Stream 结束或异常 的信号,无视其中的元素触发。
测试代码 15_drain.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<int> reduceResult = intStream.drain(4);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("流已结束 === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
4. every 操作符: 全匹配校验
Future<bool> every(bool test(T element))
从方法定义上可以看出:every
返回 bool
的 Future
对象 ,一个入参是用于校验元素的回调函数,返回 bool
值。 该方法用于校验是否流中的 每个元素
都满足校验条件,如果有一个元素不满足,在原流没有其他监听的情况下,会立刻终止时间线。如下测试所示,判断条件是e>4
,第一个元素即不满足,所以会立刻停止,后续的流元素也不会被激发。
测试代码 16_every.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<bool> reduceResult = intStream.every((e) => e > 4);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
5. any 操作符: 任意匹配校验
Future<bool> any(bool test(T element))
从方法定义上可以看出:any
它返回 bool
的 Future
对象 ,一个入参是用于校验元素的回调函数,返回 bool
值。 该方法用于校验是否流中的是否存在 任一元素
校验条件,如果有一个元素满足,在原流没有其他监听的情况下,会立刻终止时间线。如下测试所示,判断条件是e>4
,第二个元素满足条件,所以会立刻停止,返回 true
, 后续的流元素也不会被激发。
测试代码 17_any.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<bool> reduceResult = intStream.any((e) => e > 4);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
四、其他操作符
下面看一下剩下的 7
的相对简单的操作符:
1. singleWhere 操作符: 单元素查询
Future<T> singleWhere(bool test(T element), {T orElse()?})
从方法定义上可以看出:singleWhere
它返回与原流 相同泛型
的 Future
对象 ,一个入参是用于校验元素的回调函数,返回 bool
值。 还有可选回调 orElse
, 用于在为查询到元素时提供默认值。
值得注意的是: 当未提供 orElse
,在流结束时没有匹配元素时,会出现 No element
异常。另外,如果在元素激活中发现第二个满足条件的元素,会抛出 Too many elements
异常,并终止后续元素的发出。也就是说,singleWhere
需要保证 有且仅有一个 元素满足条件,所以在匹配成功后,时间线并不会停下。
测试代码 18_singleWhere.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<int> reduceResult =
intStream.singleWhere((e) => e == 4, orElse: () => -1);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print(value);
});
}
2. firstWhere 操作符: 首匹配元素
Future<T> firstWhere(bool test(T element), {T orElse()?})
firstWhere
在入参和返回值上和 singleWhere
一致,但其功能上有些差异。当某个元素激活时,符合 firstWhere
的条件,时间线就会停止。也就是说 firstWhere
只负责得到第一个匹配的元素,拿到即止。另外,如果未提供 orElse
且未发现元素,出现 No element
异常。
测试代码 19_firstWhere.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<int> reduceResult = intStream.firstWhere((e) => e == 4, orElse: () => -1);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
3. lastWhere 操作符: 尾匹配元素
Future<T> lastWhere(bool test(T element), {T orElse()?})
lastWhere
在入参和返回值上和 firstWhere
一致,从名称上也能看出它的作用是匹配 最后一个 满足条件的元素。因为在流停止前,都不能确定未来的元素是否满足条件,所以 lastWhere
的时间线会到流结束。这是它和 firstWhere
一个很大的差异。另外同样,如果未提供 orElse
且未发现元素,出现 No element
异常。
测试代码 20_lastWhere.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<int> reduceResult = intStream.lastWhere((e) => e == 9, orElse: () => -1);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
4. elementAt 操作符: 索引元素
Future<T> elementAt(int index)
elementAt
非常简单, 通过指定索引进行匹配,返回对应索引位的元素,通过 Future
回调。在指定索引为的元素激活后,时间线会停止。
测试代码 21_elementAt.dart 如下:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<int> reduceResult = intStream.elementAt(4);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
5. forEach、contains 和 join
剩下的三个比较简单,就一起说吧, forEach
是对 Stream
的元素进行 遍历操作,返回的 Future
对象会在流结束时完成,返回 null
。如下代码 22_forEach.dart ,每次元素激活时都会触发 process
方法进行处理:
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<dynamic> reduceResult = intStream.forEach(process);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
void process(int e){
print(e);
}
contains
传入 Object?
对象,用于校验流中是否包含元素,返回 Future<bool>
对象,当检测到包含时,时间线会停止,返回 true
。测试代码如下 23_contains.dart
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<bool> reduceResult = intStream.contains(4);
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
join
方法返回 Future<String>
对象, 其作用是将元素通过入参符号进行连接成字符串。这和 List 中的 join
作用是类似的,只不过 Stream
支持异步处理 join
的过程。测试代码如下 24_join.dart
void main() {
Stream<int> intStream = StreamProvider().createStream();
Future<String> reduceResult = intStream.join(",");
int start = DateTime.now().millisecondsSinceEpoch;
reduceResult.then((value) {
print("$value === ${DateTime.now().millisecondsSinceEpoch - start} ms");
});
}
这 24
个就是 Stream
类中内置的操作方法,可以满足绝大多数使用场景。但对于一些特殊场景,比如说 debounce
防抖 、 throttle
节流等,就无法支持。可以通过三方库进行拓展,另外 Flutter 中内置的 StopWatch
和 Bloc
中的 Emitter
都是比较有趣的东西。下一篇,也是本专题的最后一篇,将进一步探索 Stream
流转换的实现方式,来达到自己拓展 Stream 的目的。敬请期待 ~
转载自:https://juejin.cn/post/7166140693218328606