仅从观察者模式、流来视角来快速上手rxjs
前言
- 本文不过多引入概念,仅从观察者模式、流来视角来快速上手rxjs。个人认为一上来就讲上来就一堆讲原理和概念,容易让新手懵逼。
- 建议用rxjs在线运行工具在执行本文中的代码,方便理解:stackblitz.com/edit/rxjs-p…
首先什么时候使用rxjs
若你需要事件系统时,先考虑用EventEmitter。
当使用EventEmitter的过程中,遇到一堆复杂的时序问题,就可以考虑了使用rxjs
什么是时序问题?
举个例子: 某场景需要等a、b等n个事件源都发出过一次事件后,再开始监听n个事件,执行“handle操作”。问题就在于,这n个事件的发出在时序上是随机的--a可能比b先发出,b也可能比a先发出。
如下尝试使用EventEmitter来实现(伪代码)
// 变量aDone、bDone:记录a、b两事件源是否发出过第一次事件
const aDone = false
const bDone = false
function beforeRun(params) {
console.log(params)
if (aDone && bDone /* 等a、b等n个事件源都发出过一次事件后再开始再执行“handle操作” */) {
console.log('handle操作')
}
}
const eventEmitter = new EventEmitter()
eventEmitter.on('a', (params) => {
aDone = true
beforeRun(params)
})
eventEmitter.on('b', (params) => {
bDone = true
beforeRun(params)
})
// 这两个事件的第一次发出在时序上是随机的
setTimeout(() => eventEmitter.emit('a', 'aDone'), Math.floor(Math.random() * 1000));
setTimeout(() => eventEmitter.emit('b', 'bDone'), Math.floor(Math.random() * 1000));
// 后续a、b可能会继续发出事件。
setTimeout(() => eventEmitter.emit('b', 'bOther'), 1000);
这里用了两个变量aDone、bDone 来记录a、b两事件源是否发出过第一次事件。那如果后期要再加个事件源c,岂不是又要加一个变量cDone?
所以遇到复杂的时序问题,用 EventEmitter 写出来的逻辑,比较难维护。 而使用rxjs组织这段逻辑就非常简单了,在后面 combineLatest 操作符的使用中会实现这个场景。
两个视角理解rxjs
从观察者模式的视角观察rxjs
Wiki上说,观察者模式是 一个目标对象(叫subject)管理所有相依于它的观察者对象(叫observer),并且在它本身的状态改变时主动发出通知。
目标对象(subject)主要通过三个方法进行作业:
- Attach,观察者对象订阅目标对象的变化。
- Detach,取消订阅。
- Notify,目标对象发出状态改变的通知到观察者对象(观察者对象可以有多个)
三个方法对应到rxjs的Subject则是:
import { Subject } from 'rxjs';
const subject = new Subject();
const observer1 = v => console . log ( "stream 1" , v)
const observer2 = v => console.log("stream 2", v)
// Attach: 连接两个observer 到 subject
// subject在这里起到订阅的作用
subject.subscribe(observer1);
subject.subscribe(observer2);
// Notify:发出 数据1、数据2
// subject在这里起到数据源的作用
subject.next(1);
subject.next(2);
// Detach:取消追踪。
// subject在这里起到取消订阅的作用
subject.unsubscribe();
subject.next(3); // 取消追踪了,所以在发出通知会error
// 控制台输出:
// stream 1: 1
// stream 2: 1
// stream 1: 2
// stream 2: 2
// ObjectUnsubscribedError: object unsubscribed
看完上面的例子,可以理解一下rxjs出现的两个概念了
- Observer: 一组回调,它知道如何监听Subject发出的事件/值。
- Subject: 既是发出事件/值的源,也是订阅/取消订阅的地方。
上面用 Subject 举例子来理解了rxjs里面的观察者模式,而 Subject 是 Observable 的一种实现,Observable 是比Subject更基础的概念。我们下面回过头来介绍 Observable。
从流的视角去介绍 Observable
“那么流是指什么呢?举个例子,代码中每1s输出一个数字,用户每一次对元素的点击,就像是在时间这个维度上,产生了一个数据集。这个数据集不像数组那样,它不是一开始都存在的,而是随着时间的流逝,一个一个数据被输出出来。这种异步行为产生的数据,就可以被称之为一个流”。
从 RxJS 的视角来看,我们程序就是一个个流组成,流由源源不断产生值或者事件组成。未来会出现的值或者事件装进一个叫做 Observable 的集合,可以理解 Observable 就是这个流。官方提供了很多操作符(Operator)帮助我们进行 “流式” 操作 -- 对流中的值或者事件进行操作。
而官方对Observable的定义:一个装着未来会出现的值或者事件 的 集合。
Observable 与 Subject
Subject 是 Observable 的一种实现。
先看看普通的 Observable 的使用:
import { Observable } from 'rxjs';
const observable = new Observable(function (subscriber) {
subscriber.next(1);
setTimeout(() => subscriber.next(2), 1000);
setTimeout(() => subscriber.next(3), 1200);
});
const subscription = observable.subscribe((value) => console.log(value));
setTimeout(() => subscription.unsubscribe(), 1100);
// 控制台打印结果是 1 、2
// 没有打印3 -- 因为此时unsubscribe了。
在使用上可以看到,二者相同点则是都可以进行 subscribe。
区别在于:
- Subject 能灵活地在各个地方用 next 发出事件,而 Observable 不能,Observable 要发出的事件/值的数据源在 new 时就已经确定了。相同点则是这两个都可以执行subscribe。
- 二者取消订阅(unsubscribe)的方式不一样。observable.subscribe 会返回一个 subscription 来取消订阅。subject 则可以直接调用 unsubscribe 来取消订阅。
还有一个不同点,Subject是多播的;普通的 Observable 是单播的,Observable 可以通过share操作符转成多播。
单播是指每个 observer 之间的流是相互独立的。多播是指多个 observer 之间的流是共享的。(自行了解下share操作符,会更清楚点,这里不多介绍)
另外,有个细节是用操作符对Subject做操作后,往往会返回普通的Observable。比如
import { Subject } from 'rxjs';
import { map } from 'rxjs/operators';
const subject = new Subject();
const observable = subject.pipe(
map((value) => value * 2)
); // 使用 map 操作符后转成了 普通的Observable
const sub = observable.subscribe((value) => console.log(value));
sub.unsubscribe()
subject.next(1);
subject.next(2);
介绍几个操作符(Operator)
操作符(Operator)可以让我们对流(Observable)进行函数式编程,就像对数组使用 map, filter, concat, reduce 一样。这里只挑几个来介绍。
filter、map
filter的作用是筛选事件/值。map的作用是对流中的每个事件/值进行转换。可以类比数组的filter、map。例子看下文的pipe。
tap
tap
操作符在 RxJS 中用于在流中插入一个副作用,而不对事件/值有任何的转换/过滤等操作。比如在tap中可以打印日志。例子看下文的pipe。
pipe
首先 pipe 不算操作符,它只是起到组织操作符的作用。
什么是 pipe
翻译为管道,想象山泉水流过管道,会经过一道道纯净化处理,如粗滤→精滤→杀菌→成品,最后变成可饮用的水。
可以把这里的粗滤、精滤、杀菌、成品看做是函数,pipe 的作用是能把多个函数组合起来执行。如下pipe 将 f 和 g 组合起来了。
const f = (x) => x + 1;
const g = (x) => 2 * x;
console.log(pipe(f, g)(1))
上面代码用pipe组合执行结果就是 g(f(1)),即: (1+1)*2 = 4
用pipe组合rxjs的操作符
import { Subject } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const mySubject = new Subject<number>();
const observable = mySubject.pipe(
map((value) => value * 2), // 将每个值乘以 2
tap()
filter((val) => val % 3 === 0), // 过滤出能被 3 整除的数字
);
observable.subscribe((val) => console.log('result', val));
mySubject.next(1);
mySubject.next(2);
mySubject.next(3);
mySubject.next(4);
mySubject.next(5);
mySubject.next(6);
// 结果输出 6 、12
Merge
除了可以对流中的 事件/值 进行操作。还可以对 Observable 进行操作。比如merge操作符。
merge的作用:通过将多个 Observable 的值混合到一个 Observable 中,即多个 Observable 的扁平化。
简单理解下就是将两个流,合并成一个流,这样你可以从一个地方接收到所有的事件/值了。
import { Subject, merge } from 'rxjs';
// 创建两个 Subject
const subject1 = new Subject<number>();
const subject2 = new Subject<string>();
// 合并 Subjects
const merged$ = merge(subject1, subject2);
// 订阅合并后的 Observable
merged$.subscribe((value) => {
console.log('合并后的值:', value);
});
// 发出事件
subject1.next(1);
subject2.next('Hello');
subject1.next(2);
// 结果:
// 合并后的值: 1
// 合并后的值: Hello
// 合并后的值: 2
想想这里用EventEmitter来模拟出merge的效果话,是不是有点麻烦?>_<
combineLatest
回到“首先什么时候使用rxjs”提到的一个场景:
"某场景需要等a、b等n个事件源都发出过一次事件后,再开始监听n个事件,执行“handle操作”。问题就在于,这n个事件的发出在时序上是随机的--a可能比b先发出,b也可能比a先发出"
使用 combineLatest 就能简单实现这个场景。看下对比起 EventEmitter 是不是简单多了?
import { Subject, combineLatest } from 'rxjs';
// 创建两个Subject
const subjectA = new Subject<string>();
const subjectB = new Subject<string>();
// combineLatest 传入内部流,组成一个新的Observable,
// 新的 Observable 会等待所有内部流发出过一次值后再开始 发出组合后的值。
const observable = combineLatest([subjectA, subjectB])
observable.subscribe(([a, b]) => {
console.log('handle操作');
console.log('观察操作执行:', a, b);
});
// 这两个事件的第一次发出在时序上是随机的
setTimeout(() => subjectA.next('aDone'), Math.floor(Math.random() * 1000));
setTimeout(() => subjectB.next('bDone'), Math.floor(Math.random() * 1000));
// 后续
setTimeout(() => subjectB.next('Btest'), 1100);
结果
最后
rxjs那么多api怎么学,根本记不住呀?
想想用lodash的时候,你会刻意去记那些工具函数吗?不会。而rxjs作为处理异步事件的工具库,自然也不需要记住它的api,等要用的时候再去找。
有个技巧是描述一下需求让gpt帮忙写。如下
本文不严谨的地方
- 很多例子没有unsubscribe,实际应用中不用unsubscribe中会导致资源泄露。
- 没谈到observable的complete,以及如何处理流中的error。
转载自:https://juejin.cn/post/7352100456333705226