likes
comments
collection
share

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

作者站长头像
站长
· 阅读数 32

开篇

本文代码git地址:Crimson/rxjs-cross-component-communication (gitee.com)

RxJS,一个大部分前端人看似熟悉但是又陌生的东西。的确,目前主流技术除了Vue就是React,只有Angular在大量使用RxJS。

我作为六年前从Java入行的程序员,做过后端,做过原生安卓,做过Java桌面,后来前后端分离流行了之后开始接触Angular直到现在。从RxJava用到RxJS,对这东西可以说是天天打交道,再熟悉不过了。

不过在这里我们不谈底层,不谈源码,也不谈RxJS一共有哪些API。我们只谈如何用RxJS去实现一个适用于前端三个框架的跨组件通信的功能。移动端开发常说,一份代码,全平台运行。在这里我们也试试看,一份代码,全框架通用

废话不多说。既然说到RxJS,那我们就先从Angular开始入手吧。毕竟RxJS就是Angular的核心插件之一了。当然,你不会Angular也完全没关系,先看看本文,了解思路,我会在接下来的两篇文章中分别在Vue与React中用RxJS实现跨组件通信的功能。对Java感兴趣的同学,我会在第四篇文章中用RxJava将本文代码复写一遍,在Swing桌面端与SpringBoot中去使用它。

我对发布订阅的理解

说到跨组件通信,那么自然就会想到发布订阅。一谈到发布订阅,我经常听到朋友说,“这东西很麻烦啊,设计起来会很复杂”、“异步满天飞,hook满天飞”等等。当然,我虽然会Vue和React,但是生产项目我只用Angular(就因为自己造了这个轮子,我连ngrx都没有去使用,也没用过Redux与Pinia)。我不知道在Vue和React中具体是怎么做跨组件通信的。但是读完本文你会发现,这东西远比你想象得要简单

作为Angular+Java全栈,我对面向对象编程是完全掌握的,所以接下来的内容全是基于面向对象去实现。我也希望还没有掌握面向对象编程的前端开发者们,好好学学面向对象,对开发真的很有帮助。另外说个题外话,作为全栈我是如何看待“前端已死”的:说点不好听的,如果你只是不会后端的纯前端,甚至没碰过数据库,不懂Linux,不懂网络,认为会了一两个前端框架就沾沾自喜,不愿跳出浏览器去看看真正的计算机世界,不是你死那谁死

说到发布订阅,我第一个想到的就是消息队列。我刚入行的时候,公司是做物联网项目的,大致业务就是设备通过消息队列上报位置到服务端,然后服务端通过websocket发送给前端页面在地图上显示。好巧不巧,我第二份工作依然是做这东西,业务也极其相似。于是就在最初的开发生涯中结识了两个消息队列:RabbitMQ与MQTT。他们有一个共同点:基于Topic的发布订阅模式。所以受到这个启发,我在第一次用Angular做项目的时候,用RxJS简单实现了该模式的功能。后来经过多个项目的迭代优化,于是就有了今天这篇文章,分享出来。

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

结构设计

以基于Topic的发布订阅模式为例,我们照着它的功能去简单的实现(不一定与它完全相同,只参考大致的结构):

  1. 它有一个服务端
  2. 有很多设备去连接这个服务端
  3. 每个设备会订阅一个或多个Topic
  4. 给服务端根据Topic发送消息,服务端会将消息发送给订阅该Topic的设备

接下来使用简单的设计模式在代码中实现它(这也是为什么我要去使用面向对象):

  1. 服务端只有一个,它是单例的
  2. 每个设备看作为客户端,客户端有多个,它是多例的
  3. 使用依赖注入,让框架去管理服务端,这样你就获得了一个可以在各处调用的服务端单例了
  4. 把页面组件比作设备,相当于每个组件都是一个客户端,各自根据Topic去订阅消息

代码实现

废话不多说,直接在代码中展现出来吧!

项目搭建

简单搭建一个Angular项目,并准备好各个组件。创建过程就不赘述了,我们直接看代码目录和页面组件结构:

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

简单描述下:

  • 在创建好的Angular项目中,创建两个文件夹,一个是rx,一个是view
  • rx目录存放通过RxJS实现的发布订阅相关代码
  • view目录中为页面组件,也就是上图页面中所展示出来的各个组件的结构

基于此页面组件结构,接下来就来实现test-a组件与test-b组件之间的通信。

封装RxJS

本章节就来讲讲如何用RxJS封装一个基于Topic的发布订阅模式的工具。在这里我给它取名为inner-mq。本章节内容,对于RxJS,你不需要去全面了解,只需要知道它的subject与subscribe就行

服务端设计

服务端的作用就是对客户端进行管理,每个组件能够注册客户端,销毁客户端。以及将发布者发送的消息根据Topic转发给指定客户端。

除此之外我做了个简单的持久化,它的作用就是:

  • 如果发布发生在订阅之前,可以暂时存储消息,等待有订阅后再将消息发出
  • 可以实现某个消息,在每一次订阅的时候都去发送一次

这样一来,很好地解决了订阅和发布之间时机错过的问题,你就不用担心发布发生在订阅之前了。

具体代码如下:rx/service/inner-mq.service.ts

@Injectable()  // 用于angular的依赖注入,不要忘了该注解
export class InnerMqService {

   private clients = new Map<Topic, Map<string, InnerMqClient>>(); // 客户端
   private persistentQueue = new Map<any, Array<{ type: PersistentType, data: any }>>(); // 持久化队列

   constructor() {
   }

   /* 创建连接 */
   public createClient(): InnerMqClient {
      let client = new NormalInnerMqClient({
         onSubscribe: (topic, subject) => {
            this.clientSubscribeCallback(client, topic, subject);
         }
      });
      return client;
   }

   /* 销毁连接 */
   public destroyClient(client: InnerMqClient): void {
      for (let topic of this.clients.keys()) {
         this.clients.get(topic)?.delete(client.getId());
      }
      client.destroy();
   }

   /* 发布 */
   public pub(topic: Topic, msg: any, option?: { persistent: boolean, type: PersistentType }): void {
      let published = false;
      let clients = this.clients.get(topic);
      if (clients == null) {
         published = false;
      } else {
         for (let client of clients.values()) {
            if (client.isDestroyed()) {
               published = false;
            }
            let subject = client.getSubject(topic);
            if (subject != null && !subject.closed) {
               subject.next(msg);
               published = true;
            } else {
               published = false;
            }
         }
      }
      // 消息未发送,进行持久化存储
      if (!published && (option && option.persistent)) {
         if (this.persistentQueue.get(topic) == null) {
            this.persistentQueue.set(topic, []);
         }
         this.persistentQueue.get(topic)?.push({ type: option.type, data: msg });
      }
   }

   /* 客户端订阅回调 */
   private clientSubscribeCallback(client: InnerMqClient, topic: Topic, subject: Subject<any>): void {
      // 根据topic存储client
      if (this.clients.get(topic) == null) {
         this.clients.set(topic, new Map<string, InnerMqClient>);
      }
      this.clients.get(topic)?.set(client.getId(), client);
      // 处理持久化消息
      this.processPersistentQueue(topic, subject);
   }

   /* 处理持久化消息 */
   private processPersistentQueue(topic: Topic, subject: Subject<any>): void {
      let queue = this.persistentQueue.get(topic);
      if (queue == null) {
         return;
      }
      // 异步发送已持久化的消息
      new Observable<boolean>((observer) => {
         Promise.resolve().then(() => {
            observer.next(true);
         })
      }).subscribe(() => {
         if (queue == null) {
            return;
         }
         for (let i = 0; i < queue.length; i++) {
            switch (queue[i].type) {
               case PersistentType.ON_ONCE_SUB:
                  subject.next(queue[i].data);
                  queue.splice(i, 1); // 将使后面的元素依次前移,数组长度减1
                  i--; // 如果不减,将漏掉一个元素
                  break;
               case PersistentType.ON_EVERY_CLIENT_EVERY_SUB:
                  subject.next(queue[i].data);
                  break;
               default:
                  break;
            }
         }
         if (queue.length == 0) {
            this.persistentQueue.delete(topic);
         }
      })
   }

}

export enum PersistentType {
   ON_ONCE_SUB, // 只进行一次缓存,一次sub后即删除
   ON_EVERY_CLIENT_EVERY_SUB, // 持久化,对每个客户端的每一次该TOPIC的sub都发送信息
}

客户端设计

客户端的作用很单一,就是单纯地订阅消息,即通过Topic接收发布者给服务端发过来的消息。

先设计一个接口,有了接口,就可以通过不同的实现类,去做不同类型的客户端。interface与implements是Java中很常用的一组关键字。

每个客户端拥有自己的唯一ID,所以我将生成随机数的方法和它放在一起。

具体代码如下:rx/client/inner-mq.client.ts

export interface InnerMqClient {

   getId(): string;

   getSubject(topic: Topic): Subject<any> | undefined;

   isDestroyed(): boolean;

   sub(topic: Topic): Observable<any>;

   destroy(): void;

}

/**
 * Client的随机数ID生成方法
 * 生成n位数字字母混合字符串
 * */
export function generateMixed(n: number) {
   let chars = [
      '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '-', '=',
      '~', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '_', '+', '*',
      'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
      'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
      'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
      'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
   ];
   let res = '';
   for (let i = 0; i < n; i++) {
      let id = Math.floor(Math.random() * chars.length);
      res += chars[id];
   }
   return res;
}

接着就是通过一个类去实现该接口,实现一个具体的客户端。

具体代码如下:rx/client/impl/normal-inner-mq.client.ts

export class NormalInnerMqClient implements InnerMqClient {

   private readonly id!: string;
   private subjects = new Map<Topic, Subject<any>>(); // 实例
   private destroyed = false;

   constructor(
      private callback: {
         onSubscribe: (topic: Topic, subject: Subject<any>) => void
      }
   ) {
      this.id = generateMixed(20);
   }

   public getId(): string {
      return this.id;
   }

   public getSubject(topic: Topic): Subject<any> | undefined {
      return this.subjects.get(topic);
   }

   public isDestroyed(): boolean {
      return this.destroyed;
   }

   /* 订阅 */
   public sub(topic: Topic): Observable<any> {
      let subject = this.subjects.get(topic);
      if (subject == null) {
         subject = new Subject<any>();
         this.subjects.set(topic, subject);
      }
      this.callback.onSubscribe(topic, subject);
      return subject;
   }

   /* 销毁 */
   public destroy(): void {
      this.destroyed = true;
      for (let subject of this.subjects.values()) {
         subject.unsubscribe();
      }
      this.subjects.clear();
   }

}

其它

可以看到上图代码目录中,rx目录下有一个topic.ts文件。它的作用很简单,就是一个枚举,用来存放各个Topic。

export enum Topic {
   // 在这里定义各个Topic
   MY_TP_1,
   MY_TP_2,
   TEXT_TOPIC,
}

具体使用

现在基于Topic的发布订阅的服务端与客户端设计完成,我们就来开始使用它吧。

基础功能

首先是依赖注入。服务端是单例的,故将服务端注入到根模块中,也就是app.module.ts中。

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

注入完成后,就可以在各个组件中去使用了。这里我们需要实现test-a与test-b组件之间的通信,故在这两个组件中去使用它。

以test-a组件为例,将test-a组件当做一个客户端,给定一个Topic,来接收消息:

test-a.component.ts

@Component({
   selector: 'app-test-a',
   templateUrl: 'test-a.component.html',
   styleUrls: ['test-a.component.scss'],
})
export class TestAComponent implements OnInit, OnDestroy {

   private client!: InnerMqClient;

   constructor(
      private innerMqService: InnerMqService,
   ) {
   }

   ngOnInit(): void {
      this.client = this.innerMqService.createClient();
      this.client.sub(Topic.MY_TP_1).subscribe((res) => {
         console.log(res);
      });
   }

   ngOnDestroy(): void {
      this.innerMqService.destroyClient(this.client);
      console.log('test-a销毁客户端');
   }

}

test-a订阅了MY_TP_1这个Topic,现在在test-b中放置一个按钮,使用该Topic给服务端发送消息。只发消息,不做订阅,就不需要客户端了,直接给服务端发送即可:

test-b.component.html

<div class="container">
   <div class="name">
      test-b component
   </div>
   <button class="send-btn" (click)="send()">发送</button>
</div>

test-b.component.ts

@Component({
   selector: 'app-test-b',
   templateUrl: 'test-b.component.html',
   styleUrls: ['test-b.component.scss'],
})
export class TestBComponent {

   constructor(
      private innerMqService: InnerMqService,
   ) {
   }

   send(): void {
      this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息');
   }

}

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

可以看到,当我点击test-b中的发送按钮,在test-a组件中顺利收到消息了。

接下来做个复杂的,在sub-component中放置一个按钮,用于控制test-a的显示与隐藏,同时test-b中改为循环发送:

sub.component.html

<div class="container">
   <div class="name">
      sub component
   </div>
   <app-test-a *ngIf="isShow" class="comp-test-a"></app-test-a>
   <button class="control-btn" (click)="showAndHide()">test-a的显示与隐藏</button>
</div>

sub.component.ts

@Component({
   selector: 'app-sub',
   templateUrl: 'sub.component.html',
   styleUrls: ['sub.component.scss'],
})
export class SubComponent {

   isShow = true;

   showAndHide(): void {
      this.isShow = !this.isShow;
   }

}

test-b.component.ts

@Component({
   selector: 'app-test-b',
   templateUrl: 'test-b.component.html',
   styleUrls: ['test-b.component.scss'],
})
export class TestBComponent implements OnInit{

   constructor(
      private innerMqService: InnerMqService,
   ) {
   }

   ngOnInit(): void {
      setInterval(() => {
         this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息,循环发送');
      }, 1000);
   }

   send(): void {
      this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息');
   }

}

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

可以看到,在test-a存在的时候,能接收到消息,在test-a从页面上移除时,在生命周期中将客户端销毁,于是也不会接收到消息了。

进阶使用

上文讲解了如何使用inner-mq,本小段中就不再放各个文件的详细代码了,我只描述场景与效果,以及关键代码点。

  1. 在test-a未存在页面上时,test-b发送消息。然后test-a组件才生成显示在页面上。这就是典型的订阅和发布之间时机错过的问题,一个发布发生在订阅之前的场景。这时候就需要使用之前提到的持久化功能:

test-b.component.tssend方法

send(): void {
   // persistent: true 表示该消息要做持久化
   // ON_ONCE_SUB 表示该消息只进行一次缓存,一次sub后即删除
   this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息', {
      persistent: true,
      type: PersistentType.ON_ONCE_SUB
   });
}

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

可以看到,test-b发给test-a的消息,一个都没落下。

  1. 有时候需要某条消息在每一次订阅的时候发送一遍。同样持久化功能也实现了这一点:

test-b.component.tssend方法

send(): void {
   // persistent: true 表示该消息要做持久化
   // ON_EVERY_CLIENT_EVERY_SUB 表示对每个客户端的每一次该Topic的订阅都发送信息
   this.innerMqService.pub(Topic.MY_TP_1, '来自test-b的消息', {
      persistent: true,
      type: PersistentType.ON_EVERY_CLIENT_EVERY_SUB
   });
}

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

  1. 当然,基于Topic的发布订阅模式,肯定是支持一对多的,即一个发布者发送消息,所有订阅该Topic的客户端都会收到该条消息。我给每个组件都加上订阅该Topic的代码,可以看到点击test-b中的发送后,每个组件中的订阅都能正确收到消息:

我只用RxJS,却搞定了三大框架的跨组件通信,甚至还能适用于Java(一、Angular篇)

如何在自己项目中使用

很简单,我写工具写插件一向不喜欢发npm,我就喜欢最简单直接的,给源码,自己放到项目中直接调用即可。我认为这样的好处就是不会影响现有依赖,以及我直接给你源码,你可以直接学习并使用,更可以自己定制。

本文给了git地址,直接拉取代码,将里面的rx文件夹拖到自己项目中,就可以愉快地使用了。

结束语

以上即是我使用了很久的跨组件通信的方法。也许不如现成插件那样成熟,但是我认为它降低了理解难度,也更加的解耦了。具体代码已经提交git,大家可以拉取下来自己玩耍,自己设计一些场景来试试。

下一篇我会在将本文代码直接搬到Vue中,在Vue中使用RxJS实现跨组件通信

本文代码git地址:Crimson/rxjs-cross-component-communication (gitee.com)