likes
comments
collection
share

简单聊聊RxJS中的Observable类和其pipe()方法

作者站长头像
站长
· 阅读数 3

本文首发在我的个人博客:nonhana.xyz

希望大家多多去参观一下呀~如果能评论一下就更好咯!

一、前言

如果有对 如何优雅的处理异步事件以及数据流 感兴趣的小伙伴,可以进来看看。

其实本来这篇文章打算放在Nest.js学习笔记里面一起总结掉的,但是我发现我低估了 Observable 这个概念理解起来的难度,导致我对着几行代码死盯了很久。。。

为什么要讲这个呢?因为在Nest.js当中是有自带了 RxJS 这个第三方库的,而且其中的很多API也是基于这个库做了很多集成,包括拦截器、异常过滤器、管道、微服务、控制器、服务等。

简单聊聊RxJS中的Observable类和其pipe()方法

  1. 拦截器(Interceptors):
    • Nest.js 的拦截器可以返回 Observable。拦截器用于在函数执行之前和之后添加额外的逻辑,它们非常适合用于日志记录、异常处理、响应转换等。
    • 例如,你可以创建一个拦截器来转换响应数据格式或者实现缓存逻辑。
  2. 异常过滤器(Exception Filters):
    • 虽然异常过滤器通常返回简单的错误响应,但在需要的情况下,它们也可以与 RxJS 结合使用,比如在处理异步操作时捕获和处理异常。
  3. 管道(Pipes):
    • 管道主要用于请求数据的验证和转换。在处理复杂的数据流或异步验证逻辑时,管道可以返回 Observable。
  4. 微服务(Microservices):
    • Nest.js 微服务模块在内部使用 RxJS 来处理消息传递。例如,基于消息队列的微服务可以使用 RxJS 来处理和响应消息。
  5. 控制器(Controllers)和服务(Services):
    • 在 Nest.js 中,控制器和服务可以返回 Observable。这使得你可以利用 RxJS 来处理 HTTP 请求和响应,以及在服务中实现复杂的业务逻辑。

可以看到,很多很多的 Nest.js 中的一些接口或者类都提供了 Observable 的返回,其重要性可见一斑。

那么,先聊一些概念性的东西吧。

二、 RxJS 到底是什么东西?

有些单纯不知道Nest.js的同学应该也是有听过RxJS的大名。给一下它的定义:

RxJS(Reactive Extensions for JavaScript)是一个使用 观察者模式迭代器模式 来处理 异步事件数据流 的库。它是 响应式编程 的 JavaScript 实现,提供了丰富的操作符来创建、处理和组合事件流。

  1. 响应式编程库: RxJS 是基于响应式编程理念构建的,这意味着它专注于异步数据流的建立、组合和处理。
  2. 使用 Observable: RxJS 的核心是 Observable,一个表示随时间推移的异步事件流的对象。Observable 可以发出三种类型的值:一个正常值、一个错误或者一个 '完成' 信号。
  3. 丰富的操作符: RxJS 提供了一系列的操作符(如 mapfilterconcatflatMap 等),这些操作符可以对数据流进行各种变换和组合,使得处理复杂的异步逻辑更加简洁和高效。
  4. 灵活性和强大的表达能力: 使用 RxJS,开发者可以以声明式的方式处理事件、异步请求和回调,大大提升了代码的可读性和维护性。
  5. 在多个领域的应用: RxJS 被广泛应用于处理 DOM 事件、执行 HTTP 请求、状态管理和更多的场景,特别是在与现代前端框架(如 Angular)结合时。

总结一下也就是,RxJS 通过提供一种 统一的方式 来处理异步和基于事件的程序,使得开发者能够以更直观和强大的方式编写应用。而这种统一的方式就是基于它的核心思想: 将一切都视为随时间变化的数据流 ,从而能够用更加函数式和响应式的方式来处理复杂的数据处理问题。

实际实现的时候,其实就是把所有的数据流、事件全都转成 Observable ,然后再在这个 Observable 的基础上进行订阅消息,实现对其发布的数据的处理。

我们也可以看到,RxJS中集成了两种重要的设计模式:观察者模式和迭代器模式。

观察者模式(Observer Pattern)

  1. 基本概念:观察者模式是一种设计模式,其中一个对象(称为“主题”或“可观察对象”)维护一组观察者,当该对象状态变化时,会自动通知所有观察者。
  2. RxJS中的体现
    • Observables(可观察对象):在RxJS中,Observables就是这样的“主题”。它代表了一个可观察的数据流或异步事件流。
    • Observers(观察者):观察者是一个包含回调函数集合的对象,这些函数决定了如何响应Observable发送的数据或事件。
    • 订阅(Subscribe):观察者通过“订阅”Observable来接收数据和通知。当Observable发出数据时,会调用观察者的回调函数(例如:next, error, complete)。

迭代器模式(Iterator Pattern)

  1. 基本概念:迭代器模式是一种设计模式,它提供了一种方法来顺序访问聚合对象中的元素,而不需要暴露该对象的底层表示。
  2. RxJS中的体现
    • 流(Streams)RxJS中的Observable可以看作是一种特殊的流,它可以被迭代。
    • 操作符(Operators)RxJS提供了一系列操作符,允许你以声明式的方式处理数据流,就像处理集合一样。这些操作符可以看作是在Observable流上的迭代操作。

RxJS中,这两种模式共同工作,提供了一种强大而灵活的方式来处理异步事件和数据流。观察者模式允许你灵活地定义如何响应数据流中的事件,而迭代器模式则提供了一种声明式的方法来处理和转换这些数据流。

三、Observable到底是什么东西?

在上面我们提到了,Observable是RxJS的核心概念所在。正是它的存在,使得这个库能够把一切都视为数据流,然后用统一的方式进行消息发布订阅,来优雅的处理数据。

首先明确一点,Observable是一个单纯的 ,并且提供了构造函数。也就是说我们可以 以各种形式对这个Observable进行实例化

Observable的基本用法

我们可以先安装RxJS

npm install rxjs --save

然后我们可以写个最简单的,将Observable实例化并使用的例子:

import { Observable } from 'rxjs';

// 创建一个新的 Observable
const observable = new Observable(subscriber => {
    // 发送一个值
    subscriber.next('Hello `RxJS`!');
    // 可以发送多个值
    subscriber.next('More values can be sent');

    // 如果有错误,可以发送一个错误信息
    // subscriber.error('Error message');

    // 最后完成 Observable
    subscriber.complete();
});

// 订阅这个 Observable
const subscription = observable.subscribe({
    next: (value) => console.log(value),
    error: (err) => console.error(err),
    complete: () => console.log('Completed')
});

// 一旦你完成了数据的接收,你可以取消订阅
// subscription.unsubscribe();

我们可以看到,创建Observable实例之后,往其内部构造函数传递的参数是一个回调函数(订阅函数),并且这个回调函数接收一个参数,也就是上面的subscriber,是一个 Subscriber 对象。这个对象继承了Subscription这个类并且继承了Observer这个接口, 是由RxJS内部自动提供的

我们首先需要明确,Observable这个类的 创建使用 ,正好对应着 消息发布消息订阅 两个过程。

在创建observable实例的时候,可以通过这个对象内部的next方法进行消息的发布。这个消息发布的内容可以是任何东西,可以是事件、具体的值、Promise等。这里的next其实很有迭代器(iterator)的味道,不过iterator主要是使用next对某个变量进行遍历,这里主要是使用next对消息的内容进行抛出。如果把这一个个的手写的next改成触发式,不断的发送消息,那就形成了最重要的 数据流 概念。

如果在发送过程中遇到了错误,比如Promise请求中遇到了错误,可以使用其error方法将其抛出。

如果所有的必要消息均已经发送完毕,可以调用complete方法表示已完成。

定义好了消息的发布之后,便需要进行消息的订阅。这就需要调用实例化完成之后的observable的subscribe(订阅)方法。这个方法内部接收一个对象,这个对象有三个属性:next、error、complete,分别对应着在消息发布时候三个不同的发布方法。

Observable的懒加载机制

那么可能会有人会奇怪,Observable是先定义好消息的发布形式,再进行这些消息订阅,你是 怎么确保这些之前定义的消息在被定义之前不被发送呢订阅的时候的value,err等值是怎么传过来的 ?这其实得益于Observable的懒加载机制。

RxJS中,Observable 的懒加载(Lazy Loading)机制是一个重要特性。这表示 Observable 不会在创建时立即开始工作,而是在被订阅(subscribe)时才开始执行

那么懒加载的工作原理是什么呢?

  1. 创建时不激活
    • 当你创建一个 Observable 实例时,你仅仅是 定义了一个数据流和如何生成这些数据的规则 。此时,数据流并不会立即开始。
  2. 订阅时激活
    • 只有当 Observable 被某个观察者订阅时,定义在 Observable 构造函数中的函数(订阅函数)才会 被执行 。这个函数负责产生数据并通过 next, error, 和 complete 方法发送给观察者。
  3. 每次订阅都是独立的
    • 每当有新的订阅发生时,Observable 会从头开始执行其定义的逻辑。这意味着每个订阅者都有自己的数据流实例,互不影响。

可以想象一下,你有一个基于用户输入的 Observable,它在用户键入时发出数据。如果这个 Observable 在创建时就开始工作,那么它可能会在没有用户输入的情况下消耗资源。但由于懒加载机制,只有当用户实际开始键入(即有观察者订阅时),Observable 才开始发出数据,从而节省了资源并减少了不必要的工作。

RxJS中提供了各种各样的方法来创建基于不同数据流的Observable

在上面我们只是简单的将Observable给实例化了,然而实际的应用中肯定涉及到不同类型的需要处理的数据流。而RxJS也提供了许多方法供我们去创建处理对应数据的Observable。下面简单的列举几个常用的方法:

  1. of(...values)
    • 用于创建一个 Observable,它会依次发出提供的参数,然后完成。
    • 例如:of(1, 2, 3) 会依次发出 1, 2, 3。
  2. from(iterable)
    • 将数组、类数组对象、Promise 或迭代器转换为 Observable。
    • 例如:from([1, 2, 3]) 会依次发出 1, 2, 3。
  3. interval(period)
    • 创建一个 Observable,它按照指定的时间间隔连续发出数字序列。
    • 例如:interval(1000) 每隔1秒发出一个递增的数字。
  4. timer(initialDelay, period)
    • 在给定的初始延迟之后,发出数字0,然后如果指定了周期,将继续以该周期发出递增的数字。
    • 例如:timer(3000, 1000) 在3秒后发出0,然后每隔1秒发出一个递增的数字。
  5. fromEvent(target, eventName)
    • 从DOM事件、Node.js EventEmitter 事件或其他事件源创建 Observable。
    • 例如:fromEvent(document, 'click') 用于从文档的点击事件创建 Observable。
  6. ajax(urlOrRequest)
    • 用于创建一个 Observable,以发出针对URL的Ajax请求的响应。
    • 例如:ajax('/api/data') 会发出对 /api/data 的Ajax请求的响应。
  7. create(subscribe)
    • 传统的方式来创建一个新的 Observable,通过提供一个 subscribe 函数。
    • 例如:new Observable(subscriber => {...})

对于这几个创建方式,尤其是of、from、interval等,都是直接往这个函数里面传值来创建的。如果不结合其懒加载的方式,是很难理解这几个函数是怎么实现订阅的?消息是怎么发布的?实际上其内部也只是通过next函数对值依次发出而已,只不过只在被subscribe方法激活的时候进行消息订阅。

四、Observable的消息发布处理——pipe()

如果只是单纯的消息的发布和订阅,那么和一般的库其实没什么太大区别。最重要的是RxJS提供了pipe这一方法,并且包含了map、filter、take等等一系列功能强大的方法(操作符),对Observable的消息发布这一 行为 进行处理。

没错,是行为而不是数据。这个行为包括了 消息发布的过程消息发布本身的内容 两方面。

pipe()实例理解

我们可以来看一个例子:

interval(500)
  .pipe(
    // take方法用来限制数据流中的数据个数
    take(5),
    // map方法用来处理数据流中的每个数据
    map((x) => ({ num: x })),
    // filter方法用来过滤数据流中的数据
    filter((x) => x.num % 2 === 0)
  )
  .subscribe((e) => console.log(e));

上面提到了interval这个函数,它返回一个Observable实例,并且给这个实例赋上了发送数据的方式:每隔指定的毫秒,发送一个递增的数字序列。

原本如果不给这个发送的数据流进行处理,通过subscribe激活订阅事件之后,接收到的值应该为单纯的1,2,3,...。那么该怎么给这段单调的数字序列输出做一些处理,输出我们想要的序列呢?这边便使用了pipe()这个方法进行处理。

pipe 方法是 RxJS 提供的一个函数,用于 组合多个操作符 ,以对数据流进行处理。可以将一系列操作符作为参数传递给 pipe 方法,这些操作符将 依次 对数据流进行处理。这里的依次很关键,也代表着pipe()中组合的这么几个操作符的执行顺序就是从开始一直到结束的,其中的数据会同流水线一般在各个操作符中进行传递。上一个操作符把数据处理好了,会自动地把这个处理好的数据送给下一个操作符接收,基于这个在上一步处理过的数据再进行进一步的加工,如此往复,直到执行到最后一个操作符为止。结合到上面的例子中:

  1. 首先,是take()操作符。这个函数内部可以传递一个参数,可以用来 限制这个数据流所发送的最大数字 。比如这边传了5,意味着最多发送到5这个数字就不继续发送了。这个操作符没有对数据本身做处理,是对发送数据的这一过程做了限制。所以说对行为的控制,就体现在这里。
  2. 接下来,是使用了map()操作符。这个函数,如果有一定JS基础的同学一定不会陌生,实际上在 RxJS 它的作用和原生数组中的map方法有异曲同工之处,都是用来对数据做映射的,区别是 RxJS 中的map方法对每个发送的数据流中的内容进行映射;而数组的map方法是对数组本身进行遍历并映射,并且返回一个新的数组。在这个例子当中,map方法就把单纯的递增数字给映射成了 { num: x } 的这么一个对象的形式。
  3. 最后,从map操作符接收到处理好的数据,然后交给filter操作符。fillter操作符其实也和原生JS中的filter比较像,同样是做数据筛选的。这边在filter里面写了 x.num % 2 === 0 这个规则,即只筛选出为偶数的数据进行发送。

数据处理完了之后,最后使用subscribe来订阅发送的消息,并将其打印出来。可以看看下面的结果:

简单聊聊RxJS中的Observable类和其pipe()方法

常用的pipe()内部的操作符举例

上面只提到了两个操作符,大概的了解了一下pipe这个函数是怎么将各个操作符组合起来完成对数据的处理的。接下来给一些在实际使用中非常常见的操作符并且附上其对应的功能:

  1. map(fn)
    • 功能:将数据流中的每个值通过函数 fn 进行转换。
    • 应用场景:当你需要修改数据流中的每个值时使用,例如将数据流中的数字乘以2。
  2. filter(fn)
    • 功能:根据函数 fn 的条件判断,决定是否保留数据流中的每个值。
    • 应用场景:用于筛选数据流,例如过滤出偶数。
  3. tap(fn)
    • 功能:执行副作用操作,如打印日志,但不改变数据流。函数 fn 接收数据流中的值。
    • 应用场景:调试或在不改变数据流的情况下执行某些操作。
  4. take(count)
    • 功能:只取数据流的前 count 个值,然后完成。
    • 应用场景:限制数据流的长度,如只取前5个值。
  5. first()
    • 功能:只取数据流的第一个值,然后完成。
    • 应用场景:当你只对数据流的第一个值感兴趣时使用。
  6. last()
    • 功能:只取数据流的最后一个值,然后完成。
    • 应用场景:当你只对数据流的最后一个值感兴趣时使用。
  7. debounceTime(ms)
    • 功能:在指定的毫秒数 ms 后,只发出最新的值,如果在这段时间内有新值产生,则重新计时。
    • 应用场景:处理高频事件,如键盘输入。
  8. throttleTime(ms)
    • 功能:在每个时间窗口 ms 的开始,发出最新的值。
    • 应用场景:限制数据流的速率,例如在滚动事件中。
  9. switchMap(fn)
    • 功能:对数据流中的每个值应用函数 fn,并将结果转换为新的 Observable,当新的值到来时,会取消之前的 Observable。
    • 应用场景:处理级联数据流,如基于当前值获取新的数据。
  10. mergeMap(fn)
    • 功能:类似于 switchMap,但它不会取消之前的 Observables,而是合并所有的 Observables。
    • 应用场景:当你需要处理每个值并同时保持所有结果时使用。
  11. catchError(fn)
    • 功能:捕获错误,并通过函数 fn 提供一种处理错误的方式。
    • 应用场景:错误处理。

五、实例:使用 RxJS 优雅的实现用户输入搜索功能

首先假设有一个 Web 应用,其中有一个搜索框,用户在其中输入文本来搜索相关内容。现在要给这个输入框加一个新功能:用户输入时实时提供搜索建议。

传统方法(非 Observable)

在不使用 Observable 的情况下,我们可能会直接在输入框的 changekeyup 事件上添加事件监听器,每次用户输入时立即触发搜索。这种方法的问题是:

  1. 性能问题:每次键盘输入都会触发搜索,可能导致大量的不必要的搜索请求。比如用户长按住键盘的某一个键导致一直无限触发输入,会导致大量的请求——有点早期DDoS的攻击的感觉了(bushi)。
  2. 无法处理快速变化:快速连续的输入可能导致搜索结果的顺序错乱。

使用 Observable

import { fromEvent, Observable } from 'rxjs';
import { map, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

// 假设 performSearch 函数返回的是一个 Observable 类型
function performSearch(searchTerm: string): Observable<any> {
  // ... 搜索逻辑
  return new Observable(); // 示例返回值
}

// 假设 displayResults 函数接受任何类型的数据作为参数
function displayResults(data: any): void {
  // ... 显示结果逻辑
}

// 获取输入框元素,并提供了明确的类型注解
const searchBox = document.getElementById('search-box') as HTMLInputElement;

// 从输入框事件创建 Observable
const typeahead: Observable<any> = fromEvent(searchBox, 'keyup').pipe(
  map((e: Event) => (e.target as HTMLInputElement).value), // 获取输入值
  debounceTime(500), // 等待 500ms,如果没有新的输入则继续
  distinctUntilChanged(), // 只有当输入值变化时才继续
  switchMap((searchTerm: string) => performSearch(searchTerm)) // 执行搜索操作
);

// 订阅 Observable
typeahead.subscribe((data: any) => {
  // 显示搜索结果
  displayResults(data);
});

借助了 RxJS 这个库,我们可以变相的使用发布订阅的模式来监听到输入框的输入事件,并且使用强大而灵活的pipe方法来把发送的输入事件使用一系列的操作符进行加工,然后用subscribe方法,将发送的数据订阅到,然后调用自定义的 displayResults 函数展示搜索的结果。

我们可以再梳理一下整个实现这一功能的流程是什么样的:

  1. 获取DOM元素

    首先是将输入框的元素获取到,并且使用类型断言将其推断成 HTMLInputElement 类型,以便进行后续获取其输入事件的操作。

    const searchBox = document.getElementById('search-box') as HTMLInputElement;
    
  2. 使用 fromEvent 函数来创建 Observable 对象

    const typeahead: Observable<any> = fromEvent(searchBox, 'keyup').pipe(
      ...
    );
    

    fromEvent 这个函数在上面列举方法时提到了。调用这个函数的时候需要传入两个参数。首先是要传入对应的DOM元素,第二个是你要在这个DOM对象上触发Observable消息发布的事件名称。这里采用了 keyup 事件,也就是每次在输入框松开键盘的时候都会触发消息的发布;而这个 消息发布的内容就是这个键盘事件本身 。这里也可以进一步的理解 "Observable可以把任何一切都视为数据流" 这一核心概念。

  3. 应用操作符处理事件流

    创建好这个 Observable 之后,应用了一系列操作符来处理这个事件流:

    • map((e: Event) => (e.target as HTMLInputElement).value):将每个事件映射(转换)为输入框的当前值。这是对发送的内容本身的处理,也就是数据处理。
    • debounceTime(500):等待 500 毫秒。如果此期间没有新的输入(即用户停止输入),则继续处理流。这是对发布这一行为的修饰,表示在映射完成之后等待500ms,在此期间 不响应任何的输入操作 。这极大的减轻了服务请求的负担。
    • distinctUntilChanged():只有在输入值发生变化时才继续。这防止了相同的输入值多次触发搜索。这也是对发布这一行为的修饰。
    • switchMap((searchTerm: string) => performSearch(searchTerm)):将输入值转换为一个新的 Observable,这里是执行搜索操作。switchMap 会自动取消之前的搜索请求,只关注最新的。这里调用了自定义的 performSearch 方法,内部的逻辑是自己编写的,包括提交后台搜索出结果并返回等。
  4. 订阅 Observable 并处理结果

    typeahead.subscribe((data: any) => {
      displayResults(data);
    });
    

    最后,代码订阅了 typeahead 这个 Observable。每当 Observable 发出一个新值(这里是搜索结果),subscribe 的回调函数就会被调用,并将结果传递给 displayResults 函数,后者负责将结果显示在界面上。

可以看到,整个流程展示了如何使用 RxJS 优雅地处理异步 事件流 。这种方式能有效减少资源浪费和避免不必要的重复搜索请求,提高应用性能和用户体验。如果这些功能全部给我们手写并封装方法,不知道需要消耗多少的时间和资源。

六、结语

这是头一次写这么长的文章啊。。。也算是自己对 RxJS 这个库的一些浅薄的理解吧,毕竟之前在接触Nest.js之前这类的需求遇到的相对比较少,而且对JS设计模式也不太熟悉。

另外,关于其实际在Nest.js的封装,我打算单独写在Nest.js学习笔记里面,比如响应拦截器、异常过滤器等非常常用的工具的自定义封装。