深入浅出 RxJS 核心原理(响应式编程篇)
背景
在最近的项目中,我们面临了一个需求:监听异步数据的更新,并及时通知相关的组件模块进行相应的处理。传统的事件监听和回调函数方式可能无法满足我们的需求,因此决定采用响应式编程的方法来解决这个问题。在实现过程中发现 RxJS 这个响应式编程库,可以很高效、可维护地实现数据的监听和组件通知。
响应式编程是什么?
响应式编程是一种编程范式,它通过使用数据流和变化传播的方式来处理数据和事件。在响应式编程中,我们将程序看作是一系列的数据流,并通过定义数据流之间的依赖关系和操作来实现数据的处理和变化。
响应式编程的优势包括:
1. 响应性: 响应式编程能够实时地响应数据的变化,当数据发生改变时,相关的处理逻辑会自动触发。这种特性可以减少手动处理和更新的工作量,提高系统的响应能力。
2. 异步和并发: 响应式编程非常适合处理异步和并发的情况,如网络请求等。通过使用异步数据流和事件处理机制,响应式编程可以有效地处理多个并发任务和事件,并提供简洁而可靠的编程模型。
3. 可组合性: 响应式编程鼓励将程序拆分为独立的组件,并通过将这些组件连接在一起来构建更复杂的系统。可以方便地对数据进行转换、过滤、合并等操作,从而实现复杂的数据处理需求。
4. 声明式: 响应式编程采用声明式的方式来描述数据流和操作,而不是指定具体的控制流程。这使得代码更加清晰、简洁和可读,减少了出错的可能性。
下面是一个简单的示例代码,展示了如何使用 RxJS 进行响应式编程:
// 创建一个 Observable 对象,表示一个数据流
const dataStream = new Observable(observer => {
// 模拟异步数据更新
setTimeout(() => {
observer.next('Data Updated');
}, 1000);
});
// 订阅数据流的变化
dataStream.subscribe(data => {
console.log(data);
});
在这个示例中,我们创建了一个 Observable 对象 dataStream,表示一个数据流。通过调用 subscribe 方法,我们订阅了这个数据流的变化,并在回调函数中打印出更新后的数据。
RxJS 的核心概念
RxJS(Reactive Extensions for JavaScript)是一个流行的响应式编程库,用于处理异步数据流。以下是 RxJS 的核心概念:
- Observable(可观测对象):Observable 是 RxJS 的核心概念,代表一个可观测的数据源。它可以发出多个值,并在时间上进行推送。Observable 是持续的数据源,可以随着时间的推移不断发出新的值。
- Observer(观察者):Observer 是订阅 Observable 并处理数据流的对象。它定义了一系列的回调函数来处理 Observable 发出的不同类型的通知。Observer 的回调函数包括
next(处理正常值)、error(处理错误)和complete(处理完成信号)。 - Subscription(订阅):Subscription 表示 Observable 的订阅关系。当我们订阅一个 Observable 时,会得到一个 Subscription 对象,它用于取消订阅和释放资源。通过调用 Subscription 的
unsubscribe()方法,可以手动取消订阅并停止接收 Observable 的值。 - Operator(操作符):操作符是 RxJS 提供的函数,用于对 Observable 进行转换、过滤和组合等操作。使用操作符,我们可以对数据流进行处理和转换,使代码更简洁和可读。常见的操作符包括
map、filter、merge、concat、debounceTime等。 - Subject(主体):Subject 是一种特殊类型的 Observable,同时充当 Observable 和 Observer 的角色。Subject 具有多播的特性,可以向多个观察者同时发送值。我们可以使用 Subject 实现事件总线、多播数据和条件订阅等功能。
- Scheduler(调度器):调度器用于控制 Observable 的执行时机和顺序。它可以指定何时以及如何执行 Observable 的订阅和发送操作。RxJS 提供了不同的调度器,如
asyncScheduler、queueScheduler等。
这些核心概念构成了 RxJS,为我们处理异步数据流提供了丰富的工具。以下是一个简单的 RxJS 示例,演示如何使用 Observable、操作符和观察者来处理异步数据流:
import { from } from 'rxjs';
import { filter, map } from 'rxjs/operators';
// 创建一个 Observable,发出一系列数字
const numbers = from([1, 2, 3, 4, 5]);
// 使用操作符对数据流进行转换和过滤
const filteredNumbers = numbers.pipe(
filter((n) => n % 2 === 0), // 过滤出偶数
map((n) => n * 2) // 将每个数字乘以 2
);
// 创建观察者来处理 Observable 的通知
const observer = {
next: (value) => console.log(value), // 处理正常值
error: (error) => console.error(error), // 处理错误
complete: () => console.log('Complete') // 处理完成信号
};
// 订阅 Observable,并传入观察者
filteredNumbers.subscribe(observer);
// 输出:
// 4
// 8
// Complete
RxJS 核心原理解析
RxJS的核心原理是一个基于可观测数据流 Stream 结合观察者模式和迭代器模式的一种异步编程的应用库。它提供了一组操作符(operators),用于创建和组合可观察对象(Observable),并对数据流进行转换、筛选、合并等操作。
可观测流
可观测流(Observable)是一个持续的数据源,它在时间上进行推送并可以发出多个值。与传统的数据集合不同,可观测流是一种异步的、惰性的数据流。观察者(Observer)是订阅这个可观测流的对象,用于处理流中发出的值。
所以,在 RxJS 中,可观测流与观察者之间的关系如下:
- 可观测流(Observable)是一个数据源,它发出多个值,并且可以被订阅。
- 观察者(Observer)订阅可观测流,并处理可观测流发出的值。
在实际使用中,我们可以通过调用可观测流的 subscribe() 方法来订阅它,并传入一个观察者对象,观察者对象中定义了用于处理可观测流发出的值的回调函数。当可观测流发出新的值时,观察者对象的相应回调函数将被调用。
以下是一个简单的示例代码,演示了可观测流和观察者之间的关系:
// 创建一个可观测流
const observable = new Observable((observer) => {
observer.next('Hello');
observer.next('World');
observer.complete();
});
// 创建一个观察者
const observer = {
next: (value) => console.log(value),
complete: () => console.log('Complete')
};
// 订阅可观测流
observable.subscribe(observer);
在上面的示例中,我们创建了一个可观测流 observable,它发出了两个值("Hello" 和 "World"),并在最后调用了 complete() 方法表示流结束。然后,我们创建了一个观察者 observer,它定义了 next 和 complete 回调函数。最后,我们通过调用 subscribe() 方法,将观察者对象传递给可观测流,实现了订阅和处理可观测流发出的值。

观察者模式
RxJS 中的Observable和Observer是基于观察者模式的概念。Observable代表一个可观察的数据源,它可以发出数据流并通知订阅者。Observer代表一个订阅者,它可以通过订阅Observable来接收数据流。当Observable有新的数据时,它会调用Observer的next方法发送数据。Observable还可以发送错误和完成事件,分别通过error和complete方法通知订阅者。观察者模式的实现在RxJS中是通过Observable类和subscribe方法来实现的。
class Observable {
constructor(subscribe) {
if (subscribe) {
this._subscribe = subscribe;
}
}
subscribe(observer) {
return this._subscribe(observer);
}
}
const myObservable = new Observable(observer => {
let count = 1;
const intervalId = setInterval(() => {
observer.next(count++);
}, 1000);
return {
unsubscribe: () => {
clearInterval(intervalId);
}
};
});
myObservable.subscribe({
next: value => console.log(value)
});
Observer接口代表一个订阅者,它定义了一组回调方法,用于处理从Observable接收到的数据流。观察者需要通过订阅(subscribe)一个Observable来接收数据。一旦订阅成功,Observable会调用观察者的next方法来发送数据项,可以调用error方法发送错误通知,或者调用complete方法发送完成通知。
总之,RxJS的Observable和Observer提供了一种便捷的方式来实现观察者模式。Observable作为生产者负责产生数据流,而Observer作为订阅者负责处理数据流的各个事件。通过使用subscribe方法将观察者订阅到可观察对象上,可以实现数据的推送和处理。
迭代器模式
迭代器模式在 RxJS 中被广泛应用,特别是在操作符(operators)和管道(pipes)的实现中。这些操作符和管道接收 Observable 对象并返回一个新的 Observable 对象,用于对数据流进行转换、筛选、组合等操作。
迭代器模式的核心思想是将迭代的过程从业务逻辑中分离出来,通过迭代器对象提供的方法next()顺序访问数据结构的每个元素,而不需要暴露数据结构的内部表示。在 RxJS 中,可观测流(Observable)就是数据结构,而操作符和管道则是迭代器对象,通过调用 next() 方法顺序访问 Observable 发出的值。
迭代器对象本质上,就是一个指针对象。通过指针对象的next(), 用来移动指针。 每调用一次next()方法,都会返回一个对象,都会返回数据结构的信息。这个对象有 value 和 done 两个属性,value属性返回当前位置的成员,done属性是一个布尔值,表示遍历是否结束,即是否有必要再调用一次next()。
let arr = ['foo', 'baz'];
let iter = arr[Symbol.iterator]();
console.log(iter.next()); //{ value: 'foo', done: false }
arr.splice(1, 0, 'bar'); // 在数组中间插入值
console.log(iter.next()); //{ value: 'bar', done: false }
console.log(iter.next()); //{ value: 'baz', done: false }
console.log(iter.next()); //{ value: undefined, done: true } 结束迭代
在 RxJS 中,使用 pipe 方法可以将多个操作符组合起来形成一个管道,从而按顺序对 Observable 进行一系列的转换和处理操作。每个操作符都接收一个 Observable 对象,并返回一个新的 Observable 对象,该对象经过操作符处理后发出转换后的值。
以下是一个示例代码,演示了使用操作符和管道对 Observable 进行转换的过程:
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// 创建一个 Observable 对象
const observable = of(1, 2, 3, 4, 5);
// 使用操作符和管道对 Observable 进行转换
const transformedObservable = observable.pipe(
filter((value) => value % 2 === 0),
map((value) => value * 2)
);
// 订阅转换后的 Observable
transformedObservable.subscribe((value) => console.log(value));
在上面的示例中,我们首先创建了一个 Observable 对象 observable,它发出了一系列的值。然后,我们使用 pipe 方法和两个操作符 filter 和 map 对 Observable 进行转换。filter 操作符用于筛选出偶数,而 map 操作符将每个偶数乘以 2。最后,我们订阅了转换后的 Observable,并在回调函数中打印出每个值。
通过使用迭代器模式,RxJS 提供了一种方便且可组合的方式来处理可观测流中的数据。操作符和管道的组合可以让我们以一种简洁、可读的方式对数据流进行处理和转换,同时将业务逻辑与数据处理逻辑分离开来,提高了代码的可维护性和扩展性。
RxJS 响应式编程使用

Observable
Observable 称之为流更容易理解。他能被多个observer(观察者)订阅,每个订阅关系相互独立、互不影响。
它具有以下关键方法和属性:
subscribe(observer):订阅可观察者,并返回 Subscription 对象。next(value):向观察者发送新的值。error(error):向观察者发送错误信号。complete():向观察者发送完成信号。
示例:
import { Observable } from 'rxjs';
const observable = Observable.create(observer => {
observer.next('foo');
observer.next('bar');
})
Observer
Observer (观察者), 称之为流的处理方法。它具有以下关键方法:
next(value):处理可观察者发送的新值。error(error):处理可观察者发送的错误信号。complete():处理可观察者发送的完成信号。
我们创建一个 Observable 时。是需要传入一个函数作为参数来生成 Observable 的值,这个函数的入参就是 observe,通过在函数内部调用 next() 来执行。当 Observable 被创建时,它能被多个 observer 订阅,而且每个订阅者之间相互独立、互不影响。
但是如何订阅呢?通过 subscribe 实现。
// 创建observable
const observable = new Observable(observer => {
observer.next(1)
setTimeout(() => {
observer.next(2)
}, 1000)
})
// 订阅observable
observable.subscribe({
next: (value) => console.log(value),
error: (err) => console.log(err),
complete: () => console.log("done"),
});
// 输出:1 -> 2 -> done
rxjs的subscribe是同步执行的。
Subscription
它的本质就是暂存了一个启动后的流,之前提到,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在 subscription 中。它具有以下关键方法和属性:
unsubscribe():取消订阅(也可以理解成停止这个流)。
我们收到信息后,如何清理订阅占用的资源,这时候就需要用到 Subscription 了。
const subscription = observable.subscribe(data => console.log(data));
subscription.unsubscribe();
Subject
Observable表示一个可观察的数据源。Observable 可以发出多个值,并且能够被观察者(Observer)订阅,以接收这些值。Observable 是惰性的,只有在被订阅时才会开始发出值。
Subject 是一种特殊类型的 Observable。与普通的 Observable 不同,Subject 具有多播(multicasting)的特性,即可以同时发送值给多个观察者。Subject 既是一个可观察的数据源,也是一个观察者。
示例:
import { Subject } from 'rxjs';
// 创建一个Subject对象
const subject = new Subject();
// 订阅Subject
const subscription = subject.subscribe({
next: (value) => console.log('Received value:', value),
error: (error) => console.error('Error:', error),
complete: () => console.log('Complete'),
});
// 发送数据到Subject
subject.next('Hello');
subject.next('World');
// 完成Subject
subject.complete();
// 取消订阅
subscription.unsubscribe();
首先创建了一个Subject对象。然后,我们通过subscribe方法订阅了这个Subject,并指定了next、error和complete回调函数。这些回调函数分别用于处理Subject发送的数据、错误和完成事件。
接下来,我们通过subject.next()方法发送了两个值:"Hello"和"World"。这些值会被发送给所有订阅了这个Subject的观察者。我们调用了subject.complete()方法来表示数据的发送已经完成。这将触发complete回调函数。
最后,我们使用subscription.unsubscribe()方法取消了对Subject的订阅。
总结来说,Subject可以作为一个可观察对象和观察者,可以用来发送数据和订阅数据。
应用场景:
- 事件总线:Subject 常用于实现事件总线的功能。我们可以通过创建一个 Subject 实例,并在不同的组件中订阅这个 Subject,以实现组件之间的事件通信和数据传递。
- 条件订阅:Subject 允许在任何时候进行订阅,这使得它非常适用于条件订阅的场景。我们可以在某个条件满足时订阅 Subject,从而开始接收值。
- 多播操作:当需要将同一个值序列发送给多个观察者时,可以使用 Subject 进行多播。这在一些场景中非常有用,例如在多个订阅者之间共享一个数据源。
Observable Vs Subject
| Observable | Subject | |
|---|---|---|
| 角色 | 生产者(单向) | 生产者、消费者(双向) |
| 消费策略 | 单播 | 多播 |
| 流转方式 | 内部发送/接收数据 | 外部发送/接收数据 |
| 消费时机 | 调用 subscribe | 调用 next |
Subject 和 Observable 是 RxJS 中的重要概念。Observable 是单播的,每个订阅者都会独立接收值,适用于一对一的场景。而 Subject 是多播的,可以同时发送值给多个订阅者,适用于一对多的场景。根据具体的需求,选择合适的概念可以帮助我们更好地处理数据流和实现响应式编程。
其他 Subject
| Subject类型 | 用法 | 区别 | 适用场景 |
|---|---|---|---|
| Subject | 将值或事件广播给多个观察者 | 无法回放历史数据 | - 广播值或事件给多个观察者 - 将非RxJS代码转换为响应式 |
| BehaviorSubject | 将最新值发送给新的观察者 | 记住最新值 | - 初始值或当前状态的广播 - 状态管理的中心数据源 |
| ReplaySubject | 向新的观察者发送历史数据 | 可以回放历史数据 | - 重新发送过去数据给新的观察者 - 缓存历史数据的场景 |
| AsyncSubject | 在完成时发送最后一个值 | 只发送最后一个值 | - 只关心Subject完成后的最终结果 - 等待异步操作完成后获取结果 |
总结
希望本文对你理解 RxJS 的工作原理以及响应式编程在实际应用中的用法有所帮助。如果您有任何疑问、建议或意见,欢迎在评论区留言,我会及时根据反馈进行修改和完善。
转载自:https://juejin.cn/post/7252171043383640123