likes
comments
collection
share

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

作者站长头像
站长
· 阅读数 11

背景

前面我们已经实现了菜单权限控制,但是我们给某一用户改了权限后,用户在不刷新的情况下,还能操作没有权限的菜单。虽然我们后面会做接口权限控制,用户调用没有权限的接口时会报错,但是这样也不太好,所以我们在改用户权限时,希望后端给前端主动发一条通知,权限变更了,然后刷新一下页面才能继续使用。

后端消息推送方案分析

轮询

后端写一个接口,前端没隔几秒去调用一次,看看有没有新消息。这种方案在websocket没出来以前是最常用的方案。但是缺点也很明显,时间间隔设置太长,消息实时性变差,时间间隔设置太短,会不停的调用接口,服务器压力变大。

WebSocket

什么是WebSocket

下面是chatgpt给出的答案:

WebSocket是一种在Web浏览器和服务器之间进行全双工通信的协议,相比于传统的HTTP协议,它具有以下优势和用途:

  1. 实时性:WebSocket提供了实时的双向数据传输,能够实现高效的实时通信,而不需要通过轮询或长轮询等间接方式。

  2. 低延迟:由于WebSocket建立在TCP连接上,并且使用更轻量级的协议头部,因此可以减少数据传输的延迟,提供更快速的响应时间。

  3. 节省带宽:WebSocket使用较少的网络流量,因为它使用更紧凑和有效的数据帧格式,并且可以使用二进制数据传输,而不仅仅是文本数据。

  4. 更强大的功能:相比于HTTP请求-响应模型,WebSocket支持服务器主动推送数据到客户端,从而能够实现实时更新、即时聊天、多人协作和实时数据展示等功能。

  5. 兼容性:WebSocket协议被广泛支持,并且现代的Web浏览器都原生支持WebSocket,无需任何额外插件或库。

基于以上特点,WebSocket被广泛应用于实时数据传输、在线聊天、多人游戏、实时协作、股票行情、推送通知、实时监控等场景,为Web应用程序提供了更好的用户体验和功能扩展性。

总结

综上所述,后端消息推送使用websocket是最好的方案,除非要兼容很老的浏览器(IE8:你在说我?)。

如果想兼容老浏览器,可以使用socket.io这个库,当浏览器不支持WebSocket的时候会自动降级成轮询方案。我们系统不打算兼容很老的浏览器,所以暂时不使用socket.io库,后面有需要可以替换。

实现

前端实现

实现思路

  • 前端我们可以使用ahooks库里面的useWebSocket这个hooks,封装了一些常用的方法,并且支持断线重连,不过不支持心跳检测,这个后面我们自己实现。

  • 把后端推送过来的消息,使用zustand库把消息存到全局,在每个组件中都能使用最新的消息。

  • 单独写一个组件,根据消息类型处理后端推送过来的消息。

定义消息数据接口

前后端消息数据结构定义如下:

// src/socket/message.ts
export enum SocketMessageType {
  /**
   * 权限变更
   */
  PermissionChange = 'PermissionChange',
  /**
   * 密码重置
   */
  PasswordChange = 'PasswordChange',
  /**
   * token过期
   */
  TokenExpire = 'TokenExpire',
}

export class SockerMessage<T = any> {
  /**
   * 消息类型
   */
  type: SocketMessageType;
  /**
   * 消息内容
   */
  data?: T;
  constructor(type: SocketMessageType, data?: T) {
    this.type = type;
    this.data = data;
  }
}

使用useWebSocket

layout.tsx文件中引入useWebSocket并使用,useWebSocket使用请看官方文档。这里url注意一下,window.location.host必须要加上,后面的token是为了鉴权。

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

当token变化时重新连接

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

配置vite支持WebSocket代理

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

添加全局store

// src/stores/global/message.ts
import {SocketMessageType} from '@/layouts/message-handle';
import {create} from 'zustand';
import {devtools} from 'zustand/middleware';

export interface SocketMessage {
  type: SocketMessageType;
  data: any;
}

interface State {
  latestMessage?: SocketMessage | null;
}

interface Action {
  setLatestMessage: (latestMessage: State['latestMessage']) => void;
}

export const useMessageStore = create<State & Action>()(
  devtools(
    (set) => {
      return {
        latestMessage: null,
        setLatestMessage: (latestMessage: State['latestMessage']) =>
          set({
            latestMessage,
          }),
      };
    },
    {
      name: 'messageStore',
    }
  )
);

监听消息

如果latestMessage变化,说明有新消息,把新消息存到全局store中。

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

封装消息处理组件

这个组件用来监听消息变化,以及处理消息。如果用户权限变更,刷新页面。如果密码修改或token失效退到登录页面。

这里用到了策略模式,可以减少if/else的使用。

// src/layouts/message-handle/index.tsx
import loginService from '@/pages/login/service';
import { toLoginPage } from '@/router';
import { useGlobalStore } from '@/stores/global';
import { useMessageStore } from '@/stores/global/message';
import { antdUtils } from '@/utils/antd';
import { useEffect } from 'react';

export enum SocketMessageType {
   PermissionChange = 'PermissionChange',
   PasswordChange = 'PasswordChange',
   TokenExpire = 'TokenExpire',
}

const MessageHandle = () => {

   const { latestMessage } = useMessageStore();
   const { refreshToken, setToken } = useGlobalStore();

   const messageHandleMap = {
      [SocketMessageType.PermissionChange]: () => {
         antdUtils.modal?.warning({
            title: '权限变更',
            content: '由于你的权限已经变更,需要重新刷新页面。',
            onOk: () => {
               window.location.reload();
            },
         })
      },
      [SocketMessageType.PasswordChange]: () => {
         const hiddenModal = antdUtils.modal?.warning({
            title: '密码重置',
            content: '密码已经重置,需要重新登录。',
            onOk: () => {
               toLoginPage();
               if (hiddenModal) {
                  hiddenModal.destroy();
               }
            },
         })
      },
      [SocketMessageType.TokenExpire]: async () => {
         // token失效调用刷新token,外面token变化,会自动重连。
         const [error, data] = await loginService.rerefshToken(refreshToken);
         if (!error) {
            setToken(data.token)
         } else {
            toLoginPage();
         }
      },
   }

   useEffect(() => {
      if (latestMessage?.type && messageHandleMap[latestMessage?.type]) {
         messageHandleMap[latestMessage?.type]();
      }
   }, [latestMessage])

   return null;
}

export default MessageHandle;

在layout中使用这个组件

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

后端实现

前言

后端midway框架已经集成WebSocket组件,同时也支持socket.io组件,我们常用的功能midway基本都是内置了,用起来很方便。这里向大家再次推荐midway框架,除了star数量比nest少,开发体验和nest差不多,甚至有些地方比nest更好(后面打算出一篇两个框架开发对比文章),框架作者在群里也会积极的回答大家问题,基本上是有求必应。

安装ws依赖

pnpm i @midwayjs/ws@3 --save
pnpm i @types/ws --save-dev

开启组件

在imports导入

// src/configuration.ts
import { Configuration } from '@midwayjs/core';
import * as ws from '@midwayjs/ws';

@Configuration({
  imports: [ws],
  // ...
})
export class MainConfiguration {
  async onReady() {
        // ...
  }
}

封装公共SocketService

封装一个单例SocketService,每个用户连接都保存在connects里面,方便后面发送消息。

// src/socket/service.ts
import { Singleton } from '@midwayjs/core';
import { Context } from '@midwayjs/ws';
import { SocketMessage } from './message';

@Singleton()
export class SocketService {
  connects = new Map<string, Context[]>();

  /**
   * 添加连接
   * @param userId 用户id
   * @param connect 用户socket连接
   */
  addConnect(userId: string, connect: Context) {
    const curConnects = this.connects.get(userId);
    if (curConnects) {
      curConnects.push(connect);
    } else {
      this.connects.set(userId, [connect]);
    }
  }

  /**
   * 删除连接
   * @param connect 用户socket连接
   */
  deleteConnect(connect: Context) {
    const connects = [...this.connects.values()];

    for (let i = 0; i < connects.length; i += 1) {
      const sockets = connects[i];
      const index = sockets.indexOf(connect);
      if (index >= 0) {
        sockets.splice(index, 1);
        break;
      }
    }
  }

  /**
   * 给指定用户发消息
   * @param userId 用户id
   * @param data 数据
   */
  sendMessage<T>(userId: string, data: SocketMessage<T>) {
    const clients = this.connects.get(userId);

    if (clients?.length) {
      clients.forEach(client => {
        client.send(JSON.stringify(data));
      });
    }
  }
}

消息对象

export enum SocketMessageType {
  /**
   * 权限变更
   */
  PermissionChange = 'PermissionChange',
  /**
   * 密码重置
   */
  PasswordChange = 'PasswordChange',
  /**
   * token过期
   */
  TokenExpire = 'TokenExpire',
}

export class SocketMessage<T = any> {
  /**
   * 消息类型
   */
  type: SocketMessageType;
  /**
   * 消息内容
   */
  data?: T;
  constructor(type: SocketMessageType, data?: T) {
    this.type = type;
    this.data = data;
  }
}

SocketController

// src/socket/controller.ts
import {
  WSController,
  OnWSConnection,
  Inject,
  OnWSMessage,
  OnWSDisConnection,
} from '@midwayjs/core';
import { RedisService } from '@midwayjs/redis';
import { Context } from '@midwayjs/ws';
import * as http from 'http';
import { SocketService } from './service';
import { SocketMessageType, SocketMessage } from './message';

@WSController()
export class SocketConnectController {
  @Inject()
  ctx: Context;
  @Inject()
  redisService: RedisService;
  @Inject()
  socketService: SocketService;

  @OnWSConnection()
  async onConnectionMethod(socket: Context, request: http.IncomingMessage) {
    // 获取url上token参数
    const token = new URLSearchParams(request.url.slice(1)).get('token');

    if (!token) {
      socket.close();
      return;
    }

    const userInfoStr = await this.redisService.get(`token:${token}`);
    if (!userInfoStr) {
      socket.send(
        JSON.stringify({
          type: SocketMessageType.TokenExpire,
        })
      );
      socket.close();
      return;
    }

    const userInfo = JSON.parse(userInfoStr);
    this.socketService.addConnect(userInfo.userId, socket);
  }

  @OnWSMessage('message')
  async gotMessage(data: Buffer) {
    // 接受前端发送过来的消息
    try {
      const message = JSON.parse(data.toString()) as SocketMessage;
      // 如果前端发送过来的消息时ping,那么就返回pong给前端
      if (message.type === SocketMessageType.Ping) {
        return {
          type: SocketMessageType.Pong,
        };
      }
    } catch {
      console.error('json parse error');
    }
  }

  @OnWSDisConnection()
  async disconnect() {
    // 客户端断开连接后,从全局connects移除
    this.socketService.deleteConnect(this.ctx);
  }
}

权限变更通知

用户权限变更目前有两个地方,第一个是用户更新角色接口,第二个是角色更新菜单接口。

  1. 改造更新用户接口

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

  1. 改造角色更新接口

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

密码变更通知

改造AuthServiceresetPassword方法,密码变更成功后,给对应用户发消息重新登录。 集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

优化

说明

虽然ahooks中封装的useWebSocket已经做了断线重连,但是它只监听了onClose事件,但是有些情况,比如服务器挂了,可能来不及给前端发断开消息,后面服务重启了,但是前端不知道,就没办法自动重连。

为了解决这个问题,心跳检测方案就出来了。就是前端一直给后端发消息,后端接受到消息后,响应一个消息,如果一段时间前端接收不到后端响应,说明后端服务可能挂了,这时候我们再去重连,如果重连失败,我们隔几秒去重连一次,后面服务启动成功了,就能自动连接上了。

心跳检测还有一个作用,后面我们用nginx做websocket代理,nginx针对websocket代理有个优化,前后端一段时间(这个时间可以自己设置)没有发送消息,就会断开,有了心跳检测,相当于前后端一直在交互,就不会断开了。

实现思路

因为useWebSocket不支持心跳检测,所以我在useWebSocket上面加了一层来实现心跳检测。

websocket连接三秒后发送一个ping类型消息给后端,再加一个定时器,3秒后websocket重连。这时候如果我们接收到了消息,就把定时器给清除掉,也就不会去重连了。useWebSocket内部已经实现了,如果重连失败,会自动重连的,reconnectLimit重连的次数,reconnectInterval重连的时间间隔,我这里设置了6秒一次,最大30次,也就是说三分钟连不上就不再连了。

源码

前端实现

import {useWebSocket} from 'ahooks';
import type {Options, Result} from 'ahooks/lib/useWebSocket';
import {useRef} from 'react';

export function useWebSocketMessage(
  socketUrl: string,
  options?: Options
): Result {
  const timerRef = useRef<number>();

  const {
    latestMessage,
    sendMessage,
    connect,
    disconnect,
    readyState,
    webSocketIns,
  } = useWebSocket(socketUrl, {
    ...options,
    reconnectLimit: 30,
    reconnectInterval: 6000,
    onOpen: (event: Event, instance: WebSocket) => {
      sendHeartbeat();

      options?.onOpen && options.onOpen(event, instance);
    },
    onMessage: (message: MessageEvent<any>, instance: WebSocket) => {
      // 再次发送心跳消息
      sendHeartbeat();

      options?.onMessage && options.onMessage(message, instance);
    },
    onClose(event, instance) {
      resetHeartbeat();

      options?.onClose && options.onClose(event, instance);
    },
    onError(event, instance) {
      resetHeartbeat();

      options?.onError && options.onError(event, instance);
    },
  });

  // 清除重连的定时器
  function resetHeartbeat() {
    if (timerRef.current) {
      window.clearTimeout(timerRef.current);
    }
  }

  // 发送心跳消息
  function sendHeartbeat() {
    resetHeartbeat();

    // 三秒之后发送一次心跳消息
    setTimeout(() => {
      sendMessage && sendMessage(JSON.stringify({type: 'Ping'}));
      // 心跳消息发送3s后,还没得到服务器响应,说明服务器可能挂了,需要自动重连。
      timerRef.current = window.setTimeout(() => {
        disconnect && disconnect();
        connect && connect();
      }, 3000);
    }, 3000);
  }

  return {
    latestMessage,
    connect,
    sendMessage,
    disconnect,
    readyState,
    webSocketIns,
  };
}

后端实现

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

nginx代理websocket配置

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

上面必须加上这段map配置,不然启动会报错,因为用到了connection_upgrade变量。

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

这里遇到一个小坑,正常代理这样配就行了,然后线上怎么都连不上,最后发现给连接的url ws后面加上/就行了,至于为啥加/才可以,我还没找到原因,如果有人知道,可以告诉我一下。

集成WebSocket实现用户权限变更消息推送,自动刷新。——从零开始搭建一个高颜值后台管理系统全栈框架(十)

留个坑

本地开发完,发布到线上,发现有时候消息能正常发送,有时候不行,调试了一段时间后,发现线上是用pm2启动的,启动了4个进程,连接被分散在各个进程上,有时候某个进程没有某个用户的连接,这时候修改密码或修改权限调了这个进程的接口,就会导致消息发不出去。这个处理起来比较麻烦,需要用到redis消息广播,这个我们下篇文章说。暂时把pm2启动进程改为1。

总结

websocket除了做后端消息推送功能,还可以做很多功能,比如在线聊天,在线客服等。所以作为前端还是有必要了解一下的。

项目体验地址:fluxyadmin.cn/user/login

前端仓库地址:github.com/dbfu/fluxy-…

后端仓库地址:github.com/dbfu/fluxy-…