likes
comments
collection
share

前端WebSocket封装

作者站长头像
站长
· 阅读数 26
  1. 此处使用store作为QqqqWs类的单例实例
  2. 此Demo假设websocket服务可以订阅多种类型消息

一、工具类

/**
 * Websocket消息回调
 * @param data 消息数据,优化后明确数据类型,提高类型安全性
 */
export type QqqqWsMessageListener = (data: any) => void
export type QqqqWsConnectedListener = () => void

/**
 * <p>
 * Websocket类封装
 * </p>
 *
 * @author zhangqin
 * @since 2024/5/10
 */
export default class QqqqWs {
  // 用于存储实例化后的websocket
  websocket: WebSocket | undefined
  // 用于存储实例化后的websocket URL
  url: string
  // 用于存储实例化后的websocket消息监听器
  initListener: QqqqWsConnectedListener | undefined
  messageListeners: QqqqWsMessageListener[] = []
  // 连接标识,避免重复连接
  isConnected = false
  // 标识主动断开连接
  userClosed = false
  // 用于存储实例化后的心跳定时器
  heartInterval: NodeJS.Timeout | undefined
  // 断线重连后的延迟定时器
  reConnectTimeoutId: NodeJS.Timeout | undefined

  constructor(url: string) {
    this.url = url
    this.creatWebSocket()
  }

  addConnectedListener(listener: QqqqWsConnectedListener) {
    if (typeof listener === "function") {
      this.initListener = listener
      if (this.isConnected) {
        listener()
      }
    } else {
      console.warn("侦听器不是函数")
    }
  }

  addMessageListener(listener: QqqqWsMessageListener) {
    if (typeof listener === "function") {
      this.messageListeners.push(listener)
    } else {
      console.warn("侦听器不是函数")
    }
  }

  removeMessageListener(listener: QqqqWsMessageListener) {
    const index = this.messageListeners.findIndex((value) => value === listener)
    if (index > -1) {
      this.messageListeners.splice(index, 1)
    } else {
      console.warn("未找到侦听器", listener)
    }
  }

  // 关闭
  close() {
    this.userClosed = true
    if (this.websocket) {
      this.websocket.close()
    }
  }

  // 发送
  sendMessage(data: string | ArrayBufferLike | Blob | ArrayBufferView) {
    console.log("发送的数据", data)
    if (data && this.websocket && this.websocket.readyState === WebSocket.OPEN) {
      this.websocket.send(data)
    }
  }

  // 创建websocket
  private creatWebSocket() {
    console.log("websocket==================")
    // 判断当前浏览器是否支持WebSocket
    if ("WebSocket" in window) {
      console.log("当前浏览器支持 WebSocket")
    } else {
      console.log("当前浏览器不支持 WebSocket")
    }
    try {
      this.initWebSocket()
    } catch (e) {
      console.log("尝试创建连接失败")
      // 如果无法连接上 webSocket 那么重新连接!可能会因为服务器重新部署,或者短暂断网等导致无法创建连接
      this.reConnect()
    }
  }

  // 定义重连函数
  private reConnect() {
    // eslint-disable-next-line @typescript-eslint/no-this-alias
    const self = this
    console.log("尝试重新连接")
    if (this.isConnected || this.userClosed) return // 如果已经连上就不在重连了
    this.reConnectTimeoutId && clearTimeout(this.reConnectTimeoutId)
    this.reConnectTimeoutId = setTimeout(() => self.initWebSocket(), 5000)
  }

  // 初始化websocket
  private initWebSocket() {
    // eslint-disable-next-line @typescript-eslint/no-this-alias
    const self = this
    this.websocket = new WebSocket(this.url)
    this.websocket.onopen = (e: Event) => {
      console.log("websocket.onopen", e)
      // 发送心跳包
      self.heartInterval = setInterval(() => self.sendHeartbeat(), 30000)
      self.isConnected = true
      if (this.initListener) {
        this.initListener()
      }
    }

    this.websocket.onmessage = (e: MessageEvent) => {
      // 回传给页面
      self.messageListeners.forEach((value) => value(e.data))
    }

    this.websocket.onerror = (e: Event) => {
      console.log("websocket.onerror", e)
      self.handleConnectionError()
    }

    this.websocket.onclose = (e: Event) => {
      console.log("websocket.onclose", e)
      self.heartInterval && clearInterval(this.heartInterval)
      self.isConnected = false
      self.websocket = undefined
      if (!self.userClosed) {
        self.reConnect()
      }
    }
  }

  // 发送心跳包
  private sendHeartbeat() {
    this.sendMessage(
      JSON.stringify({
        heartBeatCmd: {
          cmdId: 997
        }
      })
    )
  }

  // 处理连接错误
  private handleConnectionError() {
    this.heartInterval && clearInterval(this.heartInterval)
    this.isConnected = false
    this.reConnect()
  }
}

二、store

应用启动时调用useIotStore().initialize()初始化,这里是因为订阅了一个全局的数据

import { defineStore } from "pinia"
import { ref } from "vue"
import QqqqWs from "@/utils/QqqqWs"
import { jsonStrToObj } from "@/utils/jsonUtils"

export const useIotStore = defineStore("iotStore", () => {
  // ws实例类
  const qqqqWs = new QqqqWs(`ws://${location.host}/ws`)
  // 常驻内存的数据
  const xxxData = ref<any>({})

  // 初始化方法
  const initialize = () => {
    // 连接成功回调
    qqqqWs.addConnectedListener(() => {
      // 订阅xxx数据
      qqqqWs.sendMessage(
        JSON.stringify({
          connectorStatusCmd: {
            cmdId: 996
          }
        })
      )
    })
    // 注册消息侦听器
    qqqqWs.addMessageListener((data) => {
      if (typeof data === "string") {
        const obj: any = jsonStrToObj(data as string)
        if (obj?.cmdId === 996 && obj?.data) {
          xxxData.value = obj.data
        }
      }
    })
  }

  return {
    qqqqWs,
    xxxData,
    initialize
  }
})

三、hooks(vue3)

import { QqqqWsMessageListener } from "@/utils/QqqqWs"
import { onMounted, onUnmounted } from "vue"
import { useIotStore } from "@/store/modules/iot"

export function useQqqqWebSockets(listener: QqqqWsMessageListener) {
  let lastxxxxId = ""

  onMounted(() => {
    // 注册消息侦听器
    useIotStore().qqqqWs.addMessageListener(listener)
  })

  /**
   * 发送业务订阅相关消息cmd
   * @param xxxxId 消息内容
   * @returns cmdId
   */
  const sendMessage = (xxxxId: string): number => {
    const cmd = {
      tskvCmds: [
        {
          cmdId: 1,
          unsubscribe: false,
          entityId: xxxxId
        }
      ]
    }

    if (lastxxxxId) {
      cmd.tskvCmds.push({
        cmdId: 998,
        unsubscribe: true,
        entityId: lastxxxxId
      })
    }

    // 发送消息
    useIotStore().qqqqWs.sendMessage(JSON.stringify(cmd))
    lastxxxxId = xxxxId
    return 1
  }

  onUnmounted(() => {
    // 取消业务订阅
    if (lastxxxxId) {
      useIotStore().qqqqWs.sendMessage(
        JSON.stringify({
          tskvCmds: [
            {
              cmdId: 998,
              unsubscribe: false,
              entityId: lastxxxxId
            }
          ]
        })
      )
    }
    // 取消注册消息侦听器
    useIotStore().qqqqWs.removeMessageListener(listener)
  })

  return { sendMessage }
}

四、页面使用

// allData 存储全部数据
let wsCmdId = 1
const { sendMessage } = useQqqqWebSockets((res) => {
    // 收到消息
    if (typeof res === "string") {
      const obj: any = jsonStrToObj(res as string)
      if (obj?.cmdId === wsCmdId && obj?.data) {
        const t = allData.value
        Object.keys(obj.data).forEach((key) => {
          if (!t[key]) {
            t[key] = []
          }
          // 合并数据
          t[key] = obj.data[key].reduce((acc: any, item: any) => {
            const existingItemIndex = acc.findIndex((i: any) => i.id === item.id)
            if (existingItemIndex !== -1) {
              // 如果元素已存在,则更新它
              acc.splice(existingItemIndex, 1, { ...acc[existingItemIndex], ...item })
            } else {
              // 否则,添加新元素
              acc.push(item)
            }
            return acc
          }, t[key])
        })
        allData.value = t
      }
    }
  })
  
  watch(
    () => xxxxId,
    (val: string) => {
      if (val) {
        // 发送业务消息
        wsCmdId = sendMessage(val)
      }
    }
  )
转载自:https://juejin.cn/post/7367335167395250215
评论
请登录