likes
comments
collection
share

简简单单的理解 RxJS 中的多播操作符(Multicasting Operators)

作者站长头像
站长
· 阅读数 52

👨‍💻 介绍

对于 RxJS,多播可能是一个比较高级的概念。

这篇文章会通过一个例子,逐步的介绍多播以及多播的操作符。

🔥 Hot & cold Observables

默认情况下,大多数 Observables 都属于 Cold Observables ,每次我们订阅 Cold Observable 时,都会重新创建它的生产者。那这是什么意思?

首先,我们必须知道生产者是什么:它可以是我们 Observable 值的来源,它页可以是 DOM 事件、回调、HTTP 请求、迭代器等,总之,任何可以产生价值并将其传递给观察者的东西。

现在我们知道了生产者是什么,就更容易理解前面语句的含义了,它基本上是说我们的 Observable 的生产者在每次订阅时都会被一遍又一遍地创建。让我们看一个例子:

import { timer } from "rxjs";
import { tap } from "rxjs/operators";

const cold$ = timer(1000).pipe(tap(() => console.log("副作用")));

cold$.subscribe(val => console.log(`Observer 1: ${val}`));
cold$.subscribe(val => console.log(`Observer 2: ${val}`));

// 自这个 Cold Observable 启动,副作用就执行两次,一次是给 subscription

/* Output:
副作用,
Observer 1: 0,
副作用, 
Observer 2: 0
*/

如你所见,因为我们的 ObservableCold Observable ,并且它的生产者在每次订阅时都被重新创建,所以每次订阅就执行一次,副作用总共执行了两次

如果 Observable 是热的,不管我们订阅了多少次,副作用将只执行一次

举个例子来说,假设我们有一个 Ajax Observable,它可以获取一些数据。因为 Ajax ObservableCold Observable,每次订阅它时,都会发出一个新的 HTTP 请求。

20 个订阅 = 20 个 HTTP 请求。比如下面这样:

import { ajax } from "rxjs/ajax";
import { map, mergeAll, take, tap } from "rxjs/operators";

// 一个 Ajax(cold Observable)
const sharedGhibliFilm$ = ajax
  .getJSON("https://ghibliapi.herokuapp.com/films")
  .pipe(
    tap(() => console.log("我请求啦")),
    mergeAll(),
    take(1)
  );

// 两次 subscribe
sharedGhibliFilm$
  .pipe(map(({ title }) => title))
  .subscribe(title => console.log(`标题: ${title}`));

sharedGhibliFilm$
  .pipe(map(({ description }) => description))
  .subscribe(description => console.log(`描述: ${description}`));

// ... 两次请求

/* Output:
'我请求啦',
'标题: Castle in the Sky',
'我请求啦',
'描述: The orphan Sheeta inherited a mysterious crystal that...'
*/

如何才可以正确处理 热/冷 Observables 就变得很重要。

我们不想每次订阅都重新执行一遍,那就得将这个 Cold Observables 变成 Hot Observables。使用 RxJS 中的多播就可以让这件事变得很方便。

☎️ 多播 (Multicast)

多播 通过使用 Subject 共享源Observable。让我们看一个使用多播的例子:

import { interval, Subject } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";

const number$ = interval(1000).pipe(take(10));

const multicast$ = number$.pipe(
  tap(() => console.log("执行了")),
  multicast(() => new Subject())
);

multicast$.subscribe(console.log);
// Output:

如果你将这段代码跑起来,你会发现它没有任何输出,这是为什么?

因为 multicast 返回一种特殊的 Observable: ConnectableObservable

这种特殊类型的 Observable 有一个connect()方法,当被调用时,它负责使用我们提供的 Subject 订阅源 Observable

这意味着如果我们不调用connect(),流永远不会被订阅,也不会不会开始发射值。

因此,让我们对之前的代码添加并调用connect()

import { ConnectableObservable, interval, Subject } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";

const number$ = interval(1000).pipe(take(2));

const multicast$ = number$.pipe(
  tap(() => console.log("执行了")),
  multicast(() => new Subject())
) as ConnectableObservable<number>;

// number$不会发射值,直到调用 connect
multicast$.connect();

multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
执行了,
Observer 1: 0,
Observer 2: 0
(1s)
执行了,
Observer 1: 1,
Observer 2: 1
*/

现在,可以看到有输出值了。由于multicast共享源 Observable,副作用只会执行一次,即使我们订阅了很多次。

🌚 取消订阅

与所有 Observable 一样,取消订阅我们的多播 Observable 是为了避免内存泄漏。在处理返回 ConnectableObservable 的多播操作符时,需要取消对多播的订阅。

根据上面的代码片段,take(2)执行两次,并取消订阅:

import { ConnectableObservable, interval, Subject, timer } from "rxjs";
import { tap, multicast } from "rxjs/operators";

const number$ = interval(1000);

const multicast$ = number$.pipe(
  tap(() => console.log("执行了")),
  multicast(() => new Subject())
) as ConnectableObservable<number>;

const connectSubscription = multicast$.connect();

multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));

// 2s后取消订阅
timer(2000)
  .pipe(tap(() => connectSubscription.unsubscribe()))
  .subscribe();

这可以让我们避免内存泄漏!

🌞 订阅者不同时返回怎么办?

上面的代码我们看到订阅后的输出是同时发生的。

然后,现实情况还有有些差别,比如下面这样,一个流同时订阅两次,第二个流会在两秒后发生:

import { ConnectableObservable, interval, Subject, timer } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";

const number$ = interval(1000).pipe(take(2));

// 通过多播,我们可以分享 number$ Observable
const multicasted$ = number$.pipe(
  tap(_ => console.log("我执行了")),
  multicast(() => new Subject())
) as ConnectableObservable<number>;

multicasted$.connect();

multicasted$.subscribe(val => console.log(`Observer 1: ${val}`));

// 2s后不会接收到值,因为值已经发射过了
timer(2000)
  .pipe(
    tap(() =>
      multicasted$.subscribe(val => console.log(`Late observer: ${val}`))
    )
  )
  .subscribe();
/* Output:
我执行了,
Observer 1: 0,
(1s)
我执行了,
Observer 1: 1,
Late observer: 1
*/

我们可以看到,第二个流返回的值好像没有从 0 开始,而是直接返回了 1。那么该如何解决这个问题呢?

我们所要做的就是将 Subject 替换为 ReplaySubject

由于 ReplaySubjects 会向新订阅者重新发送旧值,就可以解决掉上面那个问题,具体参考下面这段代码:

import { ConnectableObservable, interval, ReplaySubject, timer } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";

const number$ = interval(1000).pipe(take(2));

const multicasted$ = number$.pipe(
  tap(_ => console.log("我执行了")),
  // 使用 ReplaySubject
  multicast(() => new ReplaySubject())
) as ConnectableObservable<number>;

multicasted$.connect();

multicasted$.subscribe(val => console.log(`Observer 1: ${val}`));

// 因为使用了ReplaySubject,2s后会接收到已经发射的值
timer(2000)
  .pipe(
    tap(() =>
      multicasted$.subscribe(val => console.log(`Late observer: ${val}`))
    )
  )
  .subscribe();
/* Output:
我执行了,
Observer 1: 0,
(1s)
Late observer: 0,
我执行了,
Observer 1: 1,
Late observer: 1
*/

可以看到,第二个流虽然延后开始,但是也是从 0 开始发出了值。

🍀 publish()

我们再回到使用 multicast(() => new Subject()) 这块。

RxJS 中可以使用 publish 对其简化,也就是说 publish 相当于 multicast(() => new Subject())

看下面这个例子:

import { ConnectableObservable, interval, timer } from "rxjs";
import { publish, tap, take } from "rxjs/operators";

const number$ = interval(1000).pipe(take(4));

const multicast$ = number$.pipe(
  tap(() => console.log("我执行了")),
  // 使用 publish
  publish()
) as ConnectableObservable<number>;

// 执行 connect
multicast$.connect();

multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));

/* Output:
我执行了,
Observer 1: 0, 
Observer 2: 0,
(1s)
我执行了,
Observer 1: 1,
Observer 2: 1...
*/

当然,在这里,我们如果想要能够正确的输出值,也得 connect() 它。

🍀 publishReplay()

既然 pulish()multicast(() => new Subject()) 的简写。

那同理,publishReplay() 就是 multicast(() => new ReplaySubject()) 的简写。

publishReplay() 也就是允许把旧的值发送给新的订阅者。看下面这个例子:

import { ConnectableObservable, interval, timer } from "rxjs";
import { publishReplay, take, tap } from "rxjs/operators";

const number$ = interval(1000).pipe(take(3));

const multicast$ = number$.pipe(
  tap(() => console.log("我执行了")),
  publishReplay()
) as ConnectableObservable<number>;

// 调用connect
multicast$.connect();

multicast$.subscribe(val => console.log(`Observer 1: ${val}`));

timer(2000)
  .pipe(
    tap(() =>
      multicast$.subscribe(val => console.log(`Late observer: ${val}`))
    )
  )
  .subscribe();

/* Output:
  我执行了,
  Observer 1: 0,
  (1s)
  我执行了,
  Observer 1: 1,
  Late observer: 0,
  Late observer: 1,
  (1s) 
  我执行了,
  Observer 1: 2
  Late observer: 2
*/

🍀 publishLast()

publishLast相当于multicast(() => new AsyncSubject())

它会等到源 Observable 完成然后发出最后一个值,参考下面这个例子:

import { ConnectableObservable, interval } from "rxjs";
import { publishLast, take } from "rxjs/operators";

const number$ = interval(1000).pipe(take(4));

const multicast$ = number$.pipe(publishLast()) as ConnectableObservable<
  number
>;

// 调用 connect
multicast$.connect();

multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
(4s)
Observer 1: 3, 
Observer 2: 3
*/

🍀 publishBehavior()

publishBehavior相当于multicast(() => new BehaviorSubject())

因为它使用BehaviorSubject,所以publishBehavior我们可以指定一个初始值,像下面这样:

import { ConnectableObservable, interval } from "rxjs";
import { publishBehavior, take } from "rxjs/operators";

const number$ = interval(1000).pipe(take(2));

// 提供初始值 -1
const multicast$ = number$.pipe(publishBehavior(-1)) as ConnectableObservable<
  number
>;

// 调用 connect
multicast$.connect();

multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));

/* Output:
Observer 1: -1, 
Observer 2: -1,
(1s)
Observer 1: 0,  
Observer 2: 0,
(1s)
Observer 1: 1, 
Observer 2: 1
*/

可以看到输出值从 -1 开始输出。

🍀 refCount()

然而到现在为止,还是在用 connect() 方法建立连接,会很容易忘记调用,有没有更简便的方法?

RxJS 当然有,那就是 refCoun() 方法,refCount 负责在内部统计订阅的数量,它处理了两件很重要的事情:

  • 如果订阅的数量大于等于 1,refCount 会仅订阅一次源,并调用 connect()
  • 如果订阅的数量小于1,也就是没有任何订阅者,refCount 则会从源取消订阅

我们应用 refCount 看看:

import { interval, Subject, timer } from "rxjs";
import { take, takeUntil, tap, multicast, refCount } from "rxjs/operators";

const number$ = interval(1000).pipe(take(10));

// 我们使用 takeUntil + Subject 而不是 unsubscribing
const stop$ = new Subject();

const multicast$ = number$.pipe(
  tap(() => console.log("我调用了")),
  multicast(() => new Subject()),
  // 通过使用 refCount,我们不再需要手动调用 connect()
  refCount(),
  takeUntil(stop$)
);

// refCount === 1, number$ 开始订阅
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
// refCount === 2
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));

// refCount === 0, number$ 取消订阅
timer(2000)
  .pipe(
    tap(() => stop$.next()),
    tap(() => console.log("The end"))
  )
  .subscribe();

/* Output:
  (1s)
  我调用了,
  Observer 1: 0,
  Observer 2: 0,
  (1s) 
  我调用了,
  Observer 1: 1,
  Observer 2: 1,
  The end
 */

所以,refCount它负责调用connect()和取消订阅源 Observable。

🍀 share()

share() 运算符相当于 multicast(() => new Subject())refCount ,它也是最常用的多播运算符。

我们使用 share() 来改写上面的示例:

import { interval } from "rxjs";
import { take, tap, share } from "rxjs/operators";

const number$ = interval(1000).pipe(take(2));

const share$ = number$.pipe(
  tap(() => console.log("我调用了")),
  share()
);

share$.subscribe(val => console.log(`Observer 1: ${val}`));
share$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
我调用了,
Observer 1: 0,
Observer 2: 0,
(1s)
我调用了,
Observer 1: 1, 
Observer 2: 1
*/

我们使用更实际一点的一个例子,示例如下:

import { ajax } from "rxjs/ajax";
import { map, mergeAll, take, tap, share } from "rxjs/operators";

//  源流调用 share 方法让其成为 hot Observable 
const sharedGhibliFilm$ = ajax
  .getJSON("https://ghibliapi.herokuapp.com/films")
  .pipe(
    tap(() => console.log("我调用了")),
    mergeAll(),
    take(1),
    share()
  );

// 两个订阅
sharedGhibliFilm$
  .pipe(map(({ title }) => title))
  .subscribe(title => console.log(`Title: ${title}`));

sharedGhibliFilm$
  .pipe(map(({ description }) => description))
  .subscribe(description => console.log(`Description: ${description}`));

/* Output:
'我调用了',
'Title: Castle in the Sky',
'Description: The orphan Sheeta inherited a mysterious crystal that...'
*/

🍀 shareReplay()

很容易猜到,shareReplay 可以记住历史的订阅者,也就是允许把旧的值发送给新的订阅者。

shareReplay 等同于multicast(() => new ReplaySubject())refCount

这是一个例子:

import { interval, timer } from "rxjs";
import { shareReplay, take, tap } from "rxjs/operators";

const number$ = interval(1000).pipe(take(3));

// 使用 shareReplay,让其成为 hot Observable 
const multicasted$ = number$.pipe(
  tap(() => console.log("我调用了")),
  shareReplay()
);

multicasted$.subscribe(val => console.log(`Observer 1: ${val}`));

// 2s后执行
timer(2000)
  .pipe(
    tap(() =>
      multicasted$.subscribe(val => console.log(`Late observer: ${val}`))
    )
  )
  .subscribe();

// 我们可以接收到旧的值

/* Output:
  我调用了,
  Observer 1: 0,
  (1s)
  我调用了,
  Observer 1: 1,
  Late observer: 0,
  Late observer: 1,
  (1s) 
  我调用了,
  Observer 1: 2
  Late observer: 2
*/

👏 总结

  • 💡publish相当于multicast(() => new Subject()).
  • 💡publishBehavior相当于multicast(() => new BehaviorSubject()).
  • 💡publishLast相当于multicast(() => new AsyncSubject()).
  • 💡publishReplay相当于multicast(() => new ReplaySubject()).
  • 💡 有了refCount,我们不再需要手动调用,connect()也不再需要处理退订。
  • 💡share等同于multicast(() => new Subject())refCount()
  • 💡shareReplay等同于multicast(() => new ReplaySubject())refCount()

这就是 RxJS 中的全部内容啦,希望对你有帮助~ 😃

转载需注明作者及来源喔~

转载自:https://juejin.cn/post/7174713617596547132
评论
请登录