数据流之理解RxJS
为什么写这篇文章
聊到前端数据流方案,RxJS总不可避免地会被提起,但在React、Vue占据主流框架/库地位的今天,Redux、Vuex、Mobx等也被带火,几乎没有业务上来就会选用RxJS作为数据管理方案。
但这并不妨碍RxJS的功能强大和拥有高下载量。时至今日,RxJS依然是许多热门工具库的数据流方案选择,如inquirer等(详见packages depending on rxjs)。
本文是个人对RxJS的阶段性学习总结,并未非常深入,更多从概念和心智模型去探索,不涉及实战,希望读者看完后能弄懂以下问题:
- RxJS是什么
- RxJS能做什么
RxJS是什么
来看RxJS官方文档的定义:
RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序
这话看下来过于抽象,个人认为,要搞懂RxJS,只需要理解官方文档中的另一句话:
ReactiveX 结合了
观察者模式
、迭代器模式
和使用集合的函数式编程
以上的几个关键词也是本文展开的线索。
前置概念
RxJS解决异步事件管理的的基本概念
这些概念和观察者模式类似,但RxJS作了拓展,贴出来用于参考,跳过也不影响后面代码的阅读体验。
概念 | 描述 |
---|---|
Observable (可观察对象) | 表示一个概念,这个概念是一个可调用的未来值或事件的集合 |
Observer (观察者) | 一个回调函数的集合,它知道如何去监听由 Observable 提供的值 |
Subscription (订阅) | 表示 Observable 的执行,主要用于取消 Observable 的执行 |
Operators (操作符) | 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合 |
Subject (主体) | 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式 |
Schedulers (调度器) | 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他 |
JS里的拉取(Pull)和推送(Push)体系
注:这部分内容跳过也不影响阅读体验,可能看完文章再回过头来看会有更深的理解。
拉取和推送是两种不同的协议,用来描述数据生产者(Producer)
如何与数据消费者(Consumer)
进行通信的。
拉取
:由消费者来决定何时从生产者那里接收数据。生产者本身不知道数据是何时交付到消费者手中的。推送
:由生产者来决定何时把数据发送给消费者。消费者本身不知道何时会接收到数据。
生产者 | 消费者 | |
---|---|---|
拉取 | 被动的: 当被请求时产生数据 | 主动的: 决定何时请求数据 |
推送 | 主动的: 按自己的节奏产生数据 | 被动的: 对收到的数据做出反应 |
在JS中,Promises是最常见的推送体系类型。Promise(生产者)将一个解析过的值传递给已注册的回调函数(消费者),但不同于函数的是,由 Promise 来决定何时把值“推送”给回调函数。
RxJS 引入了 Observables,一个新的 JavaScript 推送体系。Observable 是多个值的生产者,并将值“推送”给观察者(消费者),填补了下面表格中的空白:
单个值 | 多个值 | |
---|---|---|
拉取 | Function | Iterator |
推送 | Promise | Observable |
ReactiveX结合了观察者模式、迭代器模式
场景一
假设我们有如上页面,点击Fetch按钮后会执行以下操作:
- 请求【接口1】,然后更新【A区域】
- 请求【接口2】,然后更新【B区域】
- 请求【接口3】,然后更新【C区域】
我们会直觉地实现上述功能:
const onClick = () => {
fetch('/api/1').then(value => {
updateA(value);
});
fetch('/api/2').then(value => {
updateB(value);
});
fetch('/api/3').then(value => {
updateC(value);
});
};
以RxJS的方式实现会是这样:
// 定义数据流
const observable = new Observable(observer => {
fetch('/api/1').then(value => {
observer.next({ source: 'api1', value });
});
fetch('/api/2').then(value => {
observer.next({ source: 'api3', value });
});
fetch('/api/3').then(value => {
observer.next({ source: 'api3', value });
});
});
const onClick = () => {
// 订阅对象以启动数据流
observable.subscribe(({ from, value }) => {
if (from === 'api1') {
updateA(value);
} else if (from === 'api2') {
updateB(value);
} else if (from === 'api3') {
updateC(value);
}
});
};
RxJS中,使用Observable
定义一个可观察对象(即数据流),然后在事件回调中通过subscribe
订阅触发数据流。
这样写有什么好处呢?好像只是用观察者模式再实现了一遍,而且代码行数还变多了?我们把场景升级一下,然后再来看。
场景二
对消费场景的拓展
还是一样的页面,但点击Fetch按钮后的操作变为:
- 请求【接口1】,然后更新【A、B区域】
- 请求【接口2】,然后更新【B、C区域】
- 请求【接口3】,然后更新【A、B、C区域】
数据需要响应更新或触发的场景变多了。
常规实现,很自然地:
const onClick = () => {
fetch('/api/1').then(value => {
updateA(value);
updateB(value);
});
fetch('/api/2').then(value => {
updateB(value);
updateC(value);
});
fetch('/api/3').then(value => {
updateA(value);
updateB(value);
updateC(value);
});
};
使用RxJS实现则是:
// 定义数据流
const observable = new Observable(observer => {
fetch('/api/1').then(value => {
observer.next({ source: 'api1', value });
});
fetch('/api/2').then(value => {
observer.next({ source: 'api3', value });
});
fetch('/api/3').then(value => {
observer.next({ source: 'api3', value });
});
});
const onClick = () => {
// 订阅对象以启动数据流
observable.subscribe(({ from, value }) => {
if (from === 'api1') {
updateA(value);
updateB(value);
} else if (from === 'api2') {
updateB(value);
updateC(value);
} else if (from === 'api3') {
updateA(value);
updateB(value);
updateC(value);
}
});
};
在这里发现了什么区别?
在RxJS的实现中,可观察对象(Observable)/数据流的定义不需要改变,只要改变观察者的行为。
这是RxJS、或者说是观察者模式的优点。在定义数据流时可以更专注于生产者的实现
,并且天然与消费者的逻辑解耦
:有几个消费者、哪个消费者要做哪些事情,流本身并不关注,更利于消费场景的变更和拓展。
进一步解耦:Subject
还是上面例子的代码:
// 定义数据流
const observable = new Observable(observer => {
fetch('/api/1').then(value => {
observer.next({ source: 'api1', value });
});
fetch('/api/2').then(value => {
observer.next({ source: 'api3', value });
});
fetch('/api/3').then(value => {
observer.next({ source: 'api3', value });
});
});
const onClick = () => {
// 订阅对象以启动数据流
observable.subscribe(({ from, value }) => {
if (from === 'api1') {
updateA(value);
updateB(value);
} else if (from === 'api2') {
updateB(value);
updateC(value);
} else if (from === 'api3') {
updateA(value);
updateB(value);
updateC(value);
}
});
};
可以看到,虽然我们将数据生产者和消费者的逻辑拆分开了,但多个消费者的逻辑还是耦合在一起的。这在一个简单的页面不是问题,但如果A、B、C区域在DOM树/组件树中的距离特别远呢,依然是难以维护的。
RxJS提供了Subject
对象来解决我们的问题。
Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者;而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。
Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。
使用Subject改造上面的代码:
// fetch.js
export const subject = new Subject();
const onClick = () => {
fetch('/api/1').then(value => {
subject.next({ source: 'api1', value });
});
fetch('/api/2').then(value => {
subject.next({ source: 'api3', value });
});
fetch('/api/3').then(value => {
subject.next({ source: 'api3', value });
});
};
// A.js A区域的处理逻辑
subject.subscribe(value => {
updateA(value);
});
// B.js B区域的处理逻辑
subject.subscribe(value => {
updateB(value);
});
// C.js C区域的处理逻辑
subject.subscribe(value => {
updateC(value);
});
如此便实现了A、B、C三个消费者的逻辑解耦。同时使打包时的代码分割(Code spliting)成为可能。
迭代器模式
另外,观察下定义数据流的这段代码:
// 定义数据流
const observable = new Observable(observer => {
fetch('/api/1').then(value => {
observer.next({ source: 'api1', value });
});
fetch('/api/2').then(value => {
observer.next({ source: 'api3', value });
});
fetch('/api/3').then(value => {
observer.next({ source: 'api3', value });
});
});
在这里定义的一个Observable中,通过迭代器模式的写法(observable.next),向消费者推送了多个值,这是Promise做不到的,或者说需要多个Promise结合才能实现,因为每个Promise只能推送一个值。这便是上文中这段话的含义:
RxJS 引入了 Observables,一个新的 JavaScript 推送体系。Observable 是多个值的生产者,并将值“推送”给观察者(消费者),填补了下面表格中的空白:
单个值 多个值 拉取 Function Iterator 推送 Promise Observable
- Function:调用时会同步地返回一个单一值
- Generator:调用时会同步地返回零到无限多个值
- Promise:是最终可能(或可能不)返回单个值的运算
- Observable:被调用后可以同步或异步地返回零到无限多个值
到这里,就解释了开篇 ReactiveX 结合了观察者模式、迭代器模式和使用集合的函数式编程
的这句话前半段。然后我们来看看什么是使用集合的函数式编程
。
使用集合的函数式编程
操作符
使用集合的函数式编程,可以跟官方文档中的另一句介绍结合来理解:
可以把 RxJS 当做是用来处理事件的 Lodash
而RxJS函数式编程机制的核心则是操作符
。
尽管 RxJS 的根基是 Observable,但最有用的还是它的操作符
。操作符是允许复杂的异步代码以声明式的方式进行轻松组合的基础代码单元。也有人说,精通RxJS其实就是在精通操作符。
实现上,操作符是 Observable 类型上的方法,本质上是一个纯函数,它接收一个 Observable 作为输入,并生成一个新的 Observable 作为输出。
如下示例,声明并启动一个数据流之后,流向观察者的数据会先经过管道(pipe)中的一个个处理方法(操作符),最后再交给观察者。
// 定义数据流
const observable = new Observable(observer => {
fetch('/api/1').then(value => {
observer.next({ source: 'api1', value });
});
fetch('/api/2').then(value => {
observer.next({ source: 'api3', value });
});
fetch('/api/3').then(value => {
observer.next({ source: 'api3', value });
});
});
const onClick = () => {
// 订阅对象以启动数据流
observable.pipe(
debounceTime(1000),
filter(value => !!value),
map(num => { return num * num; })
).subscribe(({ from, value }) => {
// ,,,
});
};
操作符的作用
Rxjs的操作符可以把复杂的问题分解成简单问题的组合,也就是多个小任务,每个小任务只需要关注问题的一个方面,小任务间的耦合度很低便于阅读和维护 。
这样数据流转过程中的各种处理(过滤、转换、并发、竞态、错误处理等)
就会变成一个个纯函数的实现。
这样对事件的处理,就有点类似lodash.flow
和lodash/fp
的结合使用,可优化掉大部分的流程式代码。
可以想象一下,用常规写法的话要怎么实现上例pipe中功能呢?如果有多个消费者,且每个消费者的行为不一样呢?
RxJS的内置操作符
RxJS内置了九类、共上百种操作符,同时也支持开发者自定义操作符,详见:RxJS - Operators。
由于介绍操作符需要较大的篇幅,本文将不展开,将来实战篇再详细探讨(有的话)。
总结
最后做个总结,还是回到 ReactiveX结合了观察者模式、迭代器模式和使用集合的函数式编程
这句话,通篇下来其实都是在介绍这几个关键词:
观察者模式
:
- 专注于生产者的实现,与消费者解耦,并且多个消费者之间的逻辑也可以解耦,更利于生产者的设计和消费者的拓展
迭代器模式
:
- 弥补了原生JS里推送多个值机制的空缺;作为多个值的生产者,将值推送给消费者(观察者)
使用集合的函数式编程
:
- 基于操作符机制,将复杂问题分解成多个简单问题,将数据流转过程中的各种处理(过滤、转换、并发、竞态、错误处理等)变成纯函数的实现和集合
- 高度适配流式数据场景
以上都是个人理解,欢迎探讨和指正。
转载自:https://juejin.cn/post/7189575760191946810