我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(二、Vue篇)
开篇
本文代码git地址:Crimson/rxjs-cross-component-communication (gitee.com)
简单重述下上回所讲述的背景:
我作为六年前从Java入行的程序员,做过后端,做过原生安卓,做过Java桌面,后来前后端分离流行了之后开始接触Angular直到现在。从RxJava用到RxJS,对这东西可以说是天天打交道,再熟悉不过了。
不过在这里我们不谈底层,不谈源码,也不谈RxJS一共有哪些API。我们只谈如何用RxJS去实现一个适用于前端三个框架的跨组件通信的功能。移动端开发常说,一份代码,全平台运行。在这里我们也试试看,一份代码,全框架通用。
作者希望Vue学习者们要好好学习这篇文章中的思想,怎么用面向对象,怎么用最基础的设计模式,以及我所认为的发布订阅,我是怎么设计它的模型,等等。说个很现实的问题,Vue入门简单,上手容易,以至于很多前端开发者只会Vue,以及完全没有接触过面向对象等这些概念。同时,在我看来,函数式不是万能的,前端开发者也必须要掌握面向对象编程。
什么是RxJS
RxJS,一个大部分前端人看似熟悉但是又陌生的东西。的确,目前主流技术除了Vue就是React,只有Angular在大量使用RxJS。
在上一回中没有介绍RxJS,是因为上回文章面向于Angular程序员,介绍RxJS就多此一举了。不过本文面向Vue程序员,所以有必要简单介绍下RxJS到底是个啥。
我就直接从网上引用几段话来介绍吧,总之这东西教程一搜一大把,也不是啥晦涩难懂的东西:
RxJS是使用Observables的响应式编程的库,它使编写异步或基于回调的代码更容易。
本文需要使用到RxJS中的以下三个概念:Observable
、Subscribtion
、Subject
:
Observable
的本质其实就是一个随时间不断产生数据的一个集合,称之为流更容易理解。而其对象存在一个 subscribe 方法,调用该方法后,才会启动这个流(也就是数据才会开始产生),这里需要注意的是多次启动的每一个流都是独立的,互不干扰。
Observer
从行为上来看,无非就是定义了如何处理上述流产生的数据,称之为流的处理方法。
Subscribtion
的本质就是暂存了一个启动后的流,之前提到,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在subscription
中,提供了unsubscribe
,来停止这个流。
简单理解了这三个名词 Observable
, Observer
, Subscribtion
后,从数据的角度来思考:
Observable
定义了要生成一个什么样的数据,其subscribe
方法,Observer
(定义了接收到数据如何处理),并开始产生数据。该方法的返回值subscription
, 存储了这个已经开启的流,同时具有unscbscribe
方法,可以将这个流停止。(引用自RxJS——异步数据流的响应式编程库(适合新手入门)_萌萌哒の瑞萌萌的博客-CSDN博客)
在RxJS中,Subject
是一类特殊的Observable
,它可以向多个Observer
多路推送数值。普通的Observable
并不具备多路推送的能力(每一个Observer
都有自己独立的执行环境),而Subject
可以共享一个执行环境。举个例子:Subject
就相当于微信公众号,它发布消息后,当进行推送的时候,Observer
就相当于关注了Subject
的微信,只要关注它的都能收到消息,也就是多路推送的意思,而Observable
,就相当于个人微信号,发一次消息只有另外一个微信号能收到,这就是Subject
和Observable
的主要区别。(引用自RxJS 核心概念之Subject_Alex__Gong的博客-CSDN博客)
为什么选择RxJS
至于为什么选择RxJS,因为一开始它就是Angular中自带的一个插件,并且我的跨组件通信也是基于RxJS来实现的。不得不说RxJS做异步流是真的挺方便。并且RxJS仅仅只是一个单纯的插件,站在它的角度来看并没有和框架有过深入的结合。简单来说,RxJS属于原生JS的范畴,你把它放哪都能玩得起来。
在几年前最初造这个轮子的时候,当时的代码和今天的差距很大。今天大家接下来读到的代码是经过我多次重构的结果。
提前说一下,上一章节介绍RxJS,里面介绍了Subject
,简单来说它就是可以进行一对多去推送的。最开始是完全基于Subject
做的组件通信,不过迭代到今天,我自己设计了一对多推送的方法,Subject
实际上是不再需要的了,Observable
就可以完成我的功能。不过我还是保留了Subject
,虽然我目前将Subject
当做Observable
使用,只做一对一,但是路不能走死,代码也不能写死,因为我考虑到以后可能需求有变,功能扩展,我还会继续去启用它。
本文的目的,并不在于推翻Redux,或者是Pinia,再或者是给出一个新技术,让大家去卷去学习。作者的目的仅仅只是一个简简单单的技术分享,以及多年开发总结的心得体会。一次简单的探索,一次大胆的尝试。作者更多的是希望各位同学们,不要用了框架,就忘了原生。
我对发布订阅的理解
这里再次讲讲我是怎么理解发布订阅的,方便第一次点进来的小伙伴们能够快速了解本文所讲述的内容。
说到发布订阅,我第一个想到的就是消息队列。我刚入行的时候,公司是做物联网项目的,大致业务就是设备通过消息队列上报位置到服务端,然后服务端通过websocket发送给前端页面在地图上显示。好巧不巧,我第二份工作依然是做这东西,业务也极其相似。于是就在最初的开发生涯中结识了两个消息队列:RabbitMQ与MQTT。他们有一个共同点:基于Topic的发布订阅模式。所以受到这个启发,我在第一次用Angular做项目的时候,用RxJS简单实现了该模式的功能。后来经过多个项目的迭代优化,于是就有了今天这篇文章,分享出来。
有意思的“消息队列”
以下为 @闲D阿强 的原文:
深红老师作为一个全栈开发者,带着他对消息队列的理解,在这个项目中进行了尝试,实现了一个消息通信的模块,名为innerMqService
,我对此非常感兴趣,故进行了深入学习。
不过再具体聊深红老师的具体实现之前,先说说我对前端通信的一些思考。
我对前端通信的一些思考
我先把我了解的通信模式摆一下,分别是:
- 观察者
- 发布订阅
- 事件驱动
观察者
代码大概的样子
-
创建观察者
let ob1 = new Observer();
-
创建消息源,并把观察者注册进来
let sub = new Subject(); sub.add(ob1);
-
消息源发消息就行了,这样
ob1
就会收到消息sub.notify('I fired `SMS` event');
存在的问题
- 耦合,这让消息源和观者者相互之间直接依赖了
- 无中心管理,每个消息源自己管理对应的订阅者(观察者)
事件驱动
代码大概的样子
-
监听事件
window.addEventListener('testEvent', () => { console.log(123) })
-
触发事件
const event = new CustomEvent('test') window.dispatchEvent(event)
进步的地方
- 解耦了
- 灵活,可以根据自己的意愿监听一些事件。
存在的问题
- 业务上没有中心管理,到处都有监听,那么混乱风险非常大。
- 时机这块还得操心,如果触发事件的时机在该事件监听之前,那就会错过。
发布订阅
代码大概的样子
-
创建中心
let center = new Center();
-
中心注册订阅的事件,事件通过自定一个字符进行标识,响应发布时,循环调用注册的订阅者。
center.subscribe('TEST', (data)=>{ // 事件被触发,循环通知给注册的订阅者 });
-
需要执行订阅的文件中获得中心管理,然后进行注册订阅者
center.register('TEST',(data)=>{ // 订阅响应的逻辑 })
-
需要执行发布的文件中获得中心管理,然后进行发布
center.publish('TEST', 'I published `TEST` event');
进步的地方
- 解耦了。
- 加入了中心管理,订阅者和发布者的关系被中心管理起来了。
存在的问题
- 中心管理的负担太大,管理的内容太多,不够细
- 不太灵活,想订阅的事件,是需要在中心服务那注册的。
- 当页面卸载之后,中心需要从庞大的订阅管理数据中,剔除掉订阅者的相关信息,或存在页面卸载,但是订阅消息还存在的风险
那么经过以上分析,发现这三个模式都有一些缺点,也就发布订阅还算理想一点,那么怎么才能在发布订阅的基础上进步一下呢?当我阅读深红老师实现的消息通信模块,我发现了答案
既然中心太大,分裂一下呗
通过以上的图,我们能看出,相比发布订阅模式,发布者和中心管理这块没有变化,主要变化的是多了一个客户端的概念,而这个客户端便是整个模式的关键。
它分担了中心管理的责任,并且还具备一定的独立性,可以管理自身的连接和关闭。
代码大概的样子
-
建立连接
let client = this.innerMqService.createClient(); client.sub(Topic.MY_TP_1).subscribe((res) => { console.log(res); });
-
订阅
client.sub(Topic.MY_TP_1).subscribe((res) => { console.log(res); });
-
发布
this.innerMqService.pub(Topic.MY_TP_1, value);
-
断开连接
this.innerMqService.destroyClient(client);
client的概念
把页面比作client设备,InnerMqService比作为服务器。
设备向服务器申请,用自己的id建立一个客户端。
服务器接收的topic和message之后,在给订阅了这个topic的每个客户端发送消息。
如果,不设置这个概念,那么当页面写在,订阅的函数依然会存在,这样不合理了,所以就要一步的抽象出client。
那么也可以理解为:
每个页面都有一个client,service管理这些client,然后每个页面通过topic注册订阅,是注册在自己的client中,当触发的时候,比如触发一个topic为A,那么service就会遍历所有的client,然后去发client中topic为A的订阅
“消息队列”,光说消息了,那么队列呢?
当消息发布的时候,如果发现客户端没准备好,就会将消息放入队列中。
当客户端准备好时,再去监测是否存在未执行的消息队列,如果存在,就先执行。
深红老师实现了一个可以持久化的消息队列,很好的解决了订阅和发布之间的是时机错过的问题,这样一来,你就不用担心发布发生在订阅之前了,更加的解耦了。
那么总结一下这个模式
进步的地方
- 解耦,有中心管理
- 灵活度提高,页面作为client进行管理订阅者,降低混乱的风险,有一定的自主权,可以自主订阅事件。
- 有队列,不担心订阅和发布错过的时机问题
- client客户端会随着生命周期维护自己的发布订阅体系,分担了中心管理的压力。
存在的问题
我还没发现,掘友们可以试试挖掘一下问题,这样也就有了进步的方向。
本章节内容转载自闲D阿强,本章节著作权归原作者所有。
结构设计
以基于Topic的发布订阅模式为例,我们照着它的功能去简单的实现(不一定与它完全相同,只参考大致的结构):
- 它有一个服务端
- 有很多设备去连接这个服务端
- 每个设备会订阅一个或多个Topic
- 给服务端根据Topic发送消息,服务端会将消息发送给订阅该Topic的设备
接下来使用简单的设计模式在代码中实现它(这也是为什么我要去使用面向对象):
- 服务端只有一个,它是单例的
- 每个设备看作为客户端,客户端有多个,它是多例的
- 使用依赖注入,让框架去管理服务端,这样你就获得了一个可以在各处调用的服务端单例了
- 把页面组件比作设备,相当于每个组件都是一个客户端,各自根据Topic去订阅消息
代码实现
废话不多说,直接在代码中展现出来吧!
项目搭建
简单搭建一个Vue项目,安装好RxJS,并准备好各个组件。创建过程就不赘述了。安装RxJS也很简单:
npm install rxjs
简单地搭建下,与上回保持一致,我们直接看代码目录和页面组件结构:
简单描述下:
- 在创建好的Vue项目中,创建两个文件夹,一个是rx,一个是view
- rx目录存放通过RxJS实现的发布订阅相关代码
- view目录中为页面组件,也就是上图页面中所展示出来的各个组件的结构
基于此页面组件结构,接下来就来实现test-a组件与test-b组件之间的通信。
封装RxJS
本章节就来讲讲如何用RxJS封装一个基于Topic的发布订阅模式的工具。在这里我给它取名为inner-mq。其实我们直接把上回在Angular中写好的代码移植过来即可,只需简单改下各个文件的import,让它适应Vue中的写法。在本章我会重新讲讲设计方法。
服务端设计
服务端的作用就是对客户端进行管理,每个组件能够注册客户端,销毁客户端。以及将发布者发送的消息根据Topic转发给指定客户端。
除此之外我做了个简单的持久化,它的作用就是:
- 如果发布发生在订阅之前,可以暂时存储消息,等待有订阅后再将消息发出
- 可以实现某个消息,在每一次订阅的时候都去发送一次
这样一来,很好地解决了订阅和发布之间时机错过的问题,你就不用担心发布发生在订阅之前了。
具体代码如下:rx/service/inner-mq.service.ts
export class InnerMqService {
private clients = new Map<Topic, Map<string, InnerMqClient>>(); // 客户端
private persistentQueue = new Map<any, Array<{ type: PersistentType, data: any }>>(); // 持久化队列
constructor() {
}
/* 创建连接 */
public createClient(): InnerMqClient {
let client = new NormalInnerMqClient({
onSubscribe: (topic, subject) => {
this.clientSubscribeCallback(client, topic, subject);
}
});
return client;
}
/* 销毁连接 */
public destroyClient(client: InnerMqClient): void {
for (let topic of this.clients.keys()) {
this.clients.get(topic)?.delete(client.getId());
}
client.destroy();
}
/* 发布 */
public pub(topic: Topic, msg: any, option?: { persistent: boolean, type: PersistentType }): void {
let published = false;
let clients = this.clients.get(topic);
if (clients == null) {
published = false;
} else {
for (let client of clients.values()) {
if (client.isDestroyed()) {
published = false;
}
let subject = client.getSubject(topic);
if (subject != null && !subject.closed) {
subject.next(msg);
published = true;
} else {
published = false;
}
}
}
// 消息未发送,进行持久化存储
if (!published && (option && option.persistent)) {
if (this.persistentQueue.get(topic) == null) {
this.persistentQueue.set(topic, []);
}
this.persistentQueue.get(topic)?.push({ type: option.type, data: msg });
}
}
/* 客户端订阅回调 */
private clientSubscribeCallback(client: InnerMqClient, topic: Topic, subject: Subject<any>): void {
// 根据topic存储client
if (this.clients.get(topic) == null) {
this.clients.set(topic, new Map<string, InnerMqClient>);
}
this.clients.get(topic)?.set(client.getId(), client);
// 处理持久化消息
this.processPersistentQueue(topic, subject);
}
/* 处理持久化消息 */
private processPersistentQueue(topic: Topic, subject: Subject<any>): void {
let queue = this.persistentQueue.get(topic);
if (queue == null) {
return;
}
// 异步发送已持久化的消息
new Observable<boolean>((observer) => {
Promise.resolve().then(() => {
observer.next(true);
})
}).subscribe(() => {
if (queue == null) {
return;
}
for (let i = 0; i < queue.length; i++) {
switch (queue[i].type) {
case PersistentType.ON_ONCE_SUB:
subject.next(queue[i].data);
queue.splice(i, 1); // 将使后面的元素依次前移,数组长度减1
i--; // 如果不减,将漏掉一个元素
break;
case PersistentType.ON_EVERY_CLIENT_EVERY_SUB:
subject.next(queue[i].data);
break;
default:
break;
}
}
if (queue.length == 0) {
this.persistentQueue.delete(topic);
}
})
}
}
export enum PersistentType {
ON_ONCE_SUB, // 只进行一次缓存,一次sub后即删除
ON_EVERY_CLIENT_EVERY_SUB, // 持久化,对每个客户端的每一次该TOPIC的sub都发送信息
}
客户端设计
客户端的作用很单一,就是单纯地订阅消息,即通过Topic接收发布者给服务端发过来的消息。
先设计一个接口,有了接口,就可以通过不同的实现类,去做不同类型的客户端。interface与implements是Java中很常用的一组关键字。
每个客户端拥有自己的唯一ID,所以我将生成随机数的方法和它放在一起。
具体代码如下:rx/client/inner-mq.client.ts
export interface InnerMqClient {
getId(): string;
getSubject(topic: Topic): Subject<any> | undefined;
isDestroyed(): boolean;
sub(topic: Topic): Observable<any>;
destroy(): void;
}
/**
* Client的随机数ID生成方法
* 生成n位数字字母混合字符串
* */
export function generateMixed(n: number) {
let chars = [
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '-', '=',
'~', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '_', '+', '*',
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
];
let res = '';
for (let i = 0; i < n; i++) {
let id = Math.floor(Math.random() * chars.length);
res += chars[id];
}
return res;
}
接着就是通过一个类去实现该接口,实现一个具体的客户端。
具体代码如下:rx/client/impl/normal-inner-mq.client.ts
export class NormalInnerMqClient implements InnerMqClient {
private readonly id!: string;
private subjects = new Map<Topic, Subject<any>>(); // 实例
private destroyed = false;
constructor(
private callback: {
onSubscribe: (topic: Topic, subject: Subject<any>) => void
}
) {
this.id = generateMixed(20);
}
public getId(): string {
return this.id;
}
public getSubject(topic: Topic): Subject<any> | undefined {
return this.subjects.get(topic);
}
public isDestroyed(): boolean {
return this.destroyed;
}
/* 订阅 */
public sub(topic: Topic): Observable<any> {
let subject = this.subjects.get(topic);
if (subject == null) {
subject = new Subject<any>();
this.subjects.set(topic, subject);
}
this.callback.onSubscribe(topic, subject);
return subject;
}
/* 销毁 */
public destroy(): void {
this.destroyed = true;
for (let subject of this.subjects.values()) {
subject.unsubscribe();
}
this.subjects.clear();
}
}
其它
可以看到上图代码目录中,rx目录下有一个topic.ts文件。它的作用很简单,就是一个枚举,用来存放各个Topic。
export enum Topic {
// 在这里定义各个Topic
MY_TP_1,
MY_TP_2,
TEXT_TOPIC,
}
具体使用
现在基于Topic的发布订阅的服务端与客户端设计完成,我们就来开始使用它吧。
基础功能
首先是依赖注入。服务端是单例的,故将服务端注入到根组件中,也就是app.component.vue中。
Vue文档依赖注入传送门:组合式 API:依赖注入 | Vue.js (vuejs.org)
app.component.vue
,setup
部分
setup() {
const innerMqService = new InnerMqService();
provide('innerMqService', innerMqService);
return {
innerMqService,
}
}
此处的作用为提供InnerMqService,可以被后代组件注入。此部分完成后,就可以在各个组件中去使用了。这里我们需要实现test-a与test-b组件之间的通信,故在这两个组件中去使用它。
以test-a组件为例,将test-a组件当做一个客户端,给定一个Topic,来接收消息:
test-a.component.vue
export default defineComponent({
name: 'app-test-a',
setup() {
const innerMqService = inject('innerMqService') as InnerMqService;
let client!: InnerMqClient;
return {
innerMqService, client,
}
},
created() { // 在进入组件时,创建客户端
this.client = this.innerMqService.createClient();
this.client.sub(Topic.MY_TP_1).subscribe((res) => {
console.log(res);
});
},
beforeUnmount() { // 离开组件时,销毁客户端
this.innerMqService.destroyClient(this.client);
console.log('test-a销毁客户端');
},
});
inject
的作用即为注入一个由祖先组件或整个应用提供的值,也就是对应上述在根组件中的提供的innerMqServiceprovide('innerMqService', innerMqService)
。
现在test-a订阅了MY_TP_1
这个Topic,现在在test-b中放置一个按钮,使用该Topic给服务端发送消息。只发消息,不做订阅,就不需要客户端了,直接给服务端发送即可。这里同样要将InnerMqService注入到test-b组件:
test-b.component.vue
<template>
<div class="container-test-b">
<div class="name-test-b">
test-b component
</div>
<button class="send-btn" @click="send()">发送</button>
</div>
</template>
export default defineComponent({
name: 'app-test-b',
setup() {
const innerMqService = inject('innerMqService') as InnerMqService;
return {
innerMqService,
}
},
methods: {
send(): void {
this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息');
}
}
});
可以看到,当我点击test-b中的发送按钮,在test-a组件中顺利收到消息了。如果你阅读了前一篇文章,你会发现:除了依赖注入方法不同,具体使用方法是一模一样的。
接下来做个复杂的,在sub-component中放置一个按钮,用于控制test-a的显示与隐藏,同时test-b中改为循环发送:
test-b.component.vue
export default defineComponent({
name: 'app-test-b',
setup() {
const innerMqService = inject('innerMqService') as InnerMqService;
return {
innerMqService,
}
},
created() {
setInterval(() => {
this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息,循环发送');
}, 1000);
},
methods: {
send(): void {
this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息');
}
}
});
可以看到,在test-a存在的时候,能接收到消息,在test-a从页面上移除时,在生命周期中将客户端销毁,于是也不会接收到消息了。
进阶使用
上文讲解了如何使用inner-mq,本小段中就不再放各个文件的详细代码了,我只描述场景与效果,以及关键代码点。
- 在test-a未存在页面上时,test-b发送消息。然后test-a组件才生成显示在页面上。这就是典型的订阅和发布之间时机错过的问题,一个发布发生在订阅之前的场景。这时候就需要使用之前提到的持久化功能:
test-b.component.vue
的send
方法
methods: {
send(): void {
// persistent: true 表示该消息要做持久化
// ON_ONCE_SUB 表示该消息只进行一次缓存,一次sub后即删除
this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息', {
persistent: true,
type: PersistentType.ON_ONCE_SUB
});
}
}
可以看到,test-b发给test-a的消息,一个都没落下。
- 有时候需要某条消息在每一次订阅的时候发送一遍。同样持久化功能也实现了这一点:
test-b.component.vue
的send
方法
methods: {
send(): void {
// persistent: true 表示该消息要做持久化
// ON_EVERY_CLIENT_EVERY_SUB 表示对每个客户端的每一次该Topic的订阅都发送信息
this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息', {
persistent: true,
type: PersistentType.ON_EVERY_CLIENT_EVERY_SUB
});
}
}
- 当然,基于Topic的发布订阅模式,肯定是支持一对多的,即一个发布者发送消息,所有订阅该Topic的客户端都会收到该条消息。我给每个组件都加上订阅该Topic的代码,可以看到点击test-b中的发送后,每个组件中的订阅都能正确收到消息:
如何在自己项目中使用
很简单,我写工具写插件一向不喜欢发npm,我就喜欢最简单直接的,给源码,自己放到项目中直接调用即可。我认为这样的好处就是不会影响现有依赖,以及我直接给你源码,你可以直接学习并使用,更可以自己定制。
和上次一样,本文给了git地址,直接拉取代码,将里面的rx文件夹拖到自己项目中,就可以愉快地使用了。
结束语
以上即是我使用了很久的跨组件通信的方法。也许不如现成插件那样成熟,但是我认为它降低了理解难度,也更加的解耦了。具体代码已经提交git,大家可以拉取下来自己玩耍,自己设计一些场景来试试。
下一篇我会在将本文代码直接搬到React中,在React中使用RxJS实现跨组件通信。
本文代码git地址:Crimson/rxjs-cross-component-communication (gitee.com)
转载自:https://juejin.cn/post/7211095650129412154