简简单单的理解 RxJS 中的多播操作符(Multicasting Operators)
👨💻 介绍
对于 RxJS,多播
可能是一个比较高级的概念。
这篇文章会通过一个例子,逐步的介绍多播以及多播的操作符。
🔥 Hot & cold Observables
默认情况下,大多数 Observables
都属于 Cold Observables
,每次我们订阅 Cold Observable
时,都会重新创建它的生产者。那这是什么意思?
首先,我们必须知道生产者是什么:它可以是我们 Observable
值的来源,它页可以是 DOM 事件、回调、HTTP 请求、迭代器等,总之,任何可以产生价值并将其传递给观察者的东西。
现在我们知道了生产者是什么,就更容易理解前面语句的含义了,它基本上是说我们的 Observable 的生产者在每次订阅时都会被一遍又一遍地创建。让我们看一个例子:
import { timer } from "rxjs";
import { tap } from "rxjs/operators";
const cold$ = timer(1000).pipe(tap(() => console.log("副作用")));
cold$.subscribe(val => console.log(`Observer 1: ${val}`));
cold$.subscribe(val => console.log(`Observer 2: ${val}`));
// 自这个 Cold Observable 启动,副作用就执行两次,一次是给 subscription
/* Output:
副作用,
Observer 1: 0,
副作用,
Observer 2: 0
*/
如你所见,因为我们的 Observable
是 Cold Observable
,并且它的生产者在每次订阅时都被重新创建,所以每次订阅就执行一次,副作用总共执行了两次。
如果 Observable
是热的,不管我们订阅了多少次,副作用将只执行一次。
举个例子来说,假设我们有一个 Ajax Observable
,它可以获取一些数据。因为 Ajax Observable
是 Cold Observable
,每次订阅它时,都会发出一个新的 HTTP 请求。
20 个订阅 = 20 个 HTTP 请求。比如下面这样:
import { ajax } from "rxjs/ajax";
import { map, mergeAll, take, tap } from "rxjs/operators";
// 一个 Ajax(cold Observable)
const sharedGhibliFilm$ = ajax
.getJSON("https://ghibliapi.herokuapp.com/films")
.pipe(
tap(() => console.log("我请求啦")),
mergeAll(),
take(1)
);
// 两次 subscribe
sharedGhibliFilm$
.pipe(map(({ title }) => title))
.subscribe(title => console.log(`标题: ${title}`));
sharedGhibliFilm$
.pipe(map(({ description }) => description))
.subscribe(description => console.log(`描述: ${description}`));
// ... 两次请求
/* Output:
'我请求啦',
'标题: Castle in the Sky',
'我请求啦',
'描述: The orphan Sheeta inherited a mysterious crystal that...'
*/
如何才可以正确处理 热/冷 Observables 就变得很重要。
我们不想每次订阅都重新执行一遍,那就得将这个 Cold Observables
变成 Hot Observables
。使用 RxJS 中的多播就可以让这件事变得很方便。
☎️ 多播 (Multicast)
多播
通过使用 Subject 共享源Observable
。让我们看一个使用多播的例子:
import { interval, Subject } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";
const number$ = interval(1000).pipe(take(10));
const multicast$ = number$.pipe(
tap(() => console.log("执行了")),
multicast(() => new Subject())
);
multicast$.subscribe(console.log);
// Output:
如果你将这段代码跑起来,你会发现它没有任何输出,这是为什么?
因为 multicast
返回一种特殊的 Observable
: ConnectableObservable
。
这种特殊类型的 Observable
有一个connect()
方法,当被调用时,它负责使用我们提供的 Subject
订阅源 Observable
。
这意味着如果我们不调用connect()
,流永远不会被订阅,也不会不会开始发射值。
因此,让我们对之前的代码添加并调用connect()
:
import { ConnectableObservable, interval, Subject } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
const multicast$ = number$.pipe(
tap(() => console.log("执行了")),
multicast(() => new Subject())
) as ConnectableObservable<number>;
// number$不会发射值,直到调用 connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
执行了,
Observer 1: 0,
Observer 2: 0
(1s)
执行了,
Observer 1: 1,
Observer 2: 1
*/
现在,可以看到有输出值了。由于multicast
共享源 Observable
,副作用只会执行一次,即使我们订阅了很多次。
🌚 取消订阅
与所有 Observable
一样,取消订阅我们的多播 Observable
是为了避免内存泄漏。在处理返回 ConnectableObservable
的多播操作符时,需要取消对多播的订阅。
根据上面的代码片段,take(2)
执行两次,并取消订阅:
import { ConnectableObservable, interval, Subject, timer } from "rxjs";
import { tap, multicast } from "rxjs/operators";
const number$ = interval(1000);
const multicast$ = number$.pipe(
tap(() => console.log("执行了")),
multicast(() => new Subject())
) as ConnectableObservable<number>;
const connectSubscription = multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
// 2s后取消订阅
timer(2000)
.pipe(tap(() => connectSubscription.unsubscribe()))
.subscribe();
这可以让我们避免内存泄漏!
🌞 订阅者不同时返回怎么办?
上面的代码我们看到订阅后的输出是同时发生的。
然后,现实情况还有有些差别,比如下面这样,一个流同时订阅两次,第二个流会在两秒后发生:
import { ConnectableObservable, interval, Subject, timer } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
// 通过多播,我们可以分享 number$ Observable
const multicasted$ = number$.pipe(
tap(_ => console.log("我执行了")),
multicast(() => new Subject())
) as ConnectableObservable<number>;
multicasted$.connect();
multicasted$.subscribe(val => console.log(`Observer 1: ${val}`));
// 2s后不会接收到值,因为值已经发射过了
timer(2000)
.pipe(
tap(() =>
multicasted$.subscribe(val => console.log(`Late observer: ${val}`))
)
)
.subscribe();
/* Output:
我执行了,
Observer 1: 0,
(1s)
我执行了,
Observer 1: 1,
Late observer: 1
*/
我们可以看到,第二个流返回的值好像没有从 0 开始,而是直接返回了 1。那么该如何解决这个问题呢?
我们所要做的就是将 Subject
替换为 ReplaySubject
。
由于 ReplaySubjects
会向新订阅者重新发送旧值,就可以解决掉上面那个问题,具体参考下面这段代码:
import { ConnectableObservable, interval, ReplaySubject, timer } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
const multicasted$ = number$.pipe(
tap(_ => console.log("我执行了")),
// 使用 ReplaySubject
multicast(() => new ReplaySubject())
) as ConnectableObservable<number>;
multicasted$.connect();
multicasted$.subscribe(val => console.log(`Observer 1: ${val}`));
// 因为使用了ReplaySubject,2s后会接收到已经发射的值
timer(2000)
.pipe(
tap(() =>
multicasted$.subscribe(val => console.log(`Late observer: ${val}`))
)
)
.subscribe();
/* Output:
我执行了,
Observer 1: 0,
(1s)
Late observer: 0,
我执行了,
Observer 1: 1,
Late observer: 1
*/
可以看到,第二个流虽然延后开始,但是也是从 0 开始发出了值。
🍀 publish()
我们再回到使用 multicast(() => new Subject())
这块。
RxJS 中可以使用 publish
对其简化,也就是说 publish
相当于 multicast(() => new Subject())
。
看下面这个例子:
import { ConnectableObservable, interval, timer } from "rxjs";
import { publish, tap, take } from "rxjs/operators";
const number$ = interval(1000).pipe(take(4));
const multicast$ = number$.pipe(
tap(() => console.log("我执行了")),
// 使用 publish
publish()
) as ConnectableObservable<number>;
// 执行 connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
我执行了,
Observer 1: 0,
Observer 2: 0,
(1s)
我执行了,
Observer 1: 1,
Observer 2: 1...
*/
当然,在这里,我们如果想要能够正确的输出值,也得 connect()
它。
🍀 publishReplay()
既然 pulish()
是 multicast(() => new Subject())
的简写。
那同理,publishReplay()
就是 multicast(() => new ReplaySubject())
的简写。
publishReplay()
也就是允许把旧的值发送给新的订阅者。看下面这个例子:
import { ConnectableObservable, interval, timer } from "rxjs";
import { publishReplay, take, tap } from "rxjs/operators";
const number$ = interval(1000).pipe(take(3));
const multicast$ = number$.pipe(
tap(() => console.log("我执行了")),
publishReplay()
) as ConnectableObservable<number>;
// 调用connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
timer(2000)
.pipe(
tap(() =>
multicast$.subscribe(val => console.log(`Late observer: ${val}`))
)
)
.subscribe();
/* Output:
我执行了,
Observer 1: 0,
(1s)
我执行了,
Observer 1: 1,
Late observer: 0,
Late observer: 1,
(1s)
我执行了,
Observer 1: 2
Late observer: 2
*/
🍀 publishLast()
publishLast
相当于multicast(() => new AsyncSubject())
。
它会等到源 Observable
完成然后发出最后一个值,参考下面这个例子:
import { ConnectableObservable, interval } from "rxjs";
import { publishLast, take } from "rxjs/operators";
const number$ = interval(1000).pipe(take(4));
const multicast$ = number$.pipe(publishLast()) as ConnectableObservable<
number
>;
// 调用 connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
(4s)
Observer 1: 3,
Observer 2: 3
*/
🍀 publishBehavior()
publishBehavior
相当于multicast(() => new BehaviorSubject())
。
因为它使用BehaviorSubject,所以publishBehavior
我们可以指定一个初始值,像下面这样:
import { ConnectableObservable, interval } from "rxjs";
import { publishBehavior, take } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
// 提供初始值 -1
const multicast$ = number$.pipe(publishBehavior(-1)) as ConnectableObservable<
number
>;
// 调用 connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
Observer 1: -1,
Observer 2: -1,
(1s)
Observer 1: 0,
Observer 2: 0,
(1s)
Observer 1: 1,
Observer 2: 1
*/
可以看到输出值从 -1 开始输出。
🍀 refCount()
然而到现在为止,还是在用 connect()
方法建立连接,会很容易忘记调用,有没有更简便的方法?
RxJS 当然有,那就是 refCoun()
方法,refCount
负责在内部统计订阅的数量,它处理了两件很重要的事情:
- 如果订阅的数量大于等于 1,
refCount
会仅订阅一次源,并调用connect()
- 如果订阅的数量小于1,也就是没有任何订阅者,
refCount
则会从源取消订阅
我们应用 refCount
看看:
import { interval, Subject, timer } from "rxjs";
import { take, takeUntil, tap, multicast, refCount } from "rxjs/operators";
const number$ = interval(1000).pipe(take(10));
// 我们使用 takeUntil + Subject 而不是 unsubscribing
const stop$ = new Subject();
const multicast$ = number$.pipe(
tap(() => console.log("我调用了")),
multicast(() => new Subject()),
// 通过使用 refCount,我们不再需要手动调用 connect()
refCount(),
takeUntil(stop$)
);
// refCount === 1, number$ 开始订阅
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
// refCount === 2
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
// refCount === 0, number$ 取消订阅
timer(2000)
.pipe(
tap(() => stop$.next()),
tap(() => console.log("The end"))
)
.subscribe();
/* Output:
(1s)
我调用了,
Observer 1: 0,
Observer 2: 0,
(1s)
我调用了,
Observer 1: 1,
Observer 2: 1,
The end
*/
所以,refCount
它负责调用connect()
和取消订阅源 Observable。
🍀 share()
share()
运算符相当于 multicast(() => new Subject())
+ refCount
,它也是最常用的多播运算符。
我们使用 share()
来改写上面的示例:
import { interval } from "rxjs";
import { take, tap, share } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
const share$ = number$.pipe(
tap(() => console.log("我调用了")),
share()
);
share$.subscribe(val => console.log(`Observer 1: ${val}`));
share$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
我调用了,
Observer 1: 0,
Observer 2: 0,
(1s)
我调用了,
Observer 1: 1,
Observer 2: 1
*/
我们使用更实际一点的一个例子,示例如下:
import { ajax } from "rxjs/ajax";
import { map, mergeAll, take, tap, share } from "rxjs/operators";
// 源流调用 share 方法让其成为 hot Observable
const sharedGhibliFilm$ = ajax
.getJSON("https://ghibliapi.herokuapp.com/films")
.pipe(
tap(() => console.log("我调用了")),
mergeAll(),
take(1),
share()
);
// 两个订阅
sharedGhibliFilm$
.pipe(map(({ title }) => title))
.subscribe(title => console.log(`Title: ${title}`));
sharedGhibliFilm$
.pipe(map(({ description }) => description))
.subscribe(description => console.log(`Description: ${description}`));
/* Output:
'我调用了',
'Title: Castle in the Sky',
'Description: The orphan Sheeta inherited a mysterious crystal that...'
*/
🍀 shareReplay()
很容易猜到,shareReplay
可以记住历史的订阅者,也就是允许把旧的值发送给新的订阅者。
shareReplay
等同于multicast(() => new ReplaySubject())
+ refCount
。
这是一个例子:
import { interval, timer } from "rxjs";
import { shareReplay, take, tap } from "rxjs/operators";
const number$ = interval(1000).pipe(take(3));
// 使用 shareReplay,让其成为 hot Observable
const multicasted$ = number$.pipe(
tap(() => console.log("我调用了")),
shareReplay()
);
multicasted$.subscribe(val => console.log(`Observer 1: ${val}`));
// 2s后执行
timer(2000)
.pipe(
tap(() =>
multicasted$.subscribe(val => console.log(`Late observer: ${val}`))
)
)
.subscribe();
// 我们可以接收到旧的值
/* Output:
我调用了,
Observer 1: 0,
(1s)
我调用了,
Observer 1: 1,
Late observer: 0,
Late observer: 1,
(1s)
我调用了,
Observer 1: 2
Late observer: 2
*/
👏 总结
- 💡
publish
相当于multicast(() => new Subject())
. - 💡
publishBehavior
相当于multicast(() => new BehaviorSubject())
. - 💡
publishLast
相当于multicast(() => new AsyncSubject())
. - 💡
publishReplay
相当于multicast(() => new ReplaySubject())
. - 💡 有了
refCount
,我们不再需要手动调用,connect()
也不再需要处理退订。 - 💡
share
等同于multicast(() => new Subject())
,refCount()
。 - 💡
shareReplay
等同于multicast(() => new ReplaySubject())
,refCount()
。
这就是 RxJS 中的全部内容啦,希望对你有帮助~ 😃
转载需注明作者及来源喔~
转载自:https://juejin.cn/post/7174713617596547132