前端WebSocket封装
- 此处使用store作为QqqqWs类的单例实例
- 此Demo假设websocket服务可以订阅多种类型消息
一、工具类
/**
* Websocket消息回调
* @param data 消息数据,优化后明确数据类型,提高类型安全性
*/
export type QqqqWsMessageListener = (data: any) => void
export type QqqqWsConnectedListener = () => void
/**
* <p>
* Websocket类封装
* </p>
*
* @author zhangqin
* @since 2024/5/10
*/
export default class QqqqWs {
// 用于存储实例化后的websocket
websocket: WebSocket | undefined
// 用于存储实例化后的websocket URL
url: string
// 用于存储实例化后的websocket消息监听器
initListener: QqqqWsConnectedListener | undefined
messageListeners: QqqqWsMessageListener[] = []
// 连接标识,避免重复连接
isConnected = false
// 标识主动断开连接
userClosed = false
// 用于存储实例化后的心跳定时器
heartInterval: NodeJS.Timeout | undefined
// 断线重连后的延迟定时器
reConnectTimeoutId: NodeJS.Timeout | undefined
constructor(url: string) {
this.url = url
this.creatWebSocket()
}
addConnectedListener(listener: QqqqWsConnectedListener) {
if (typeof listener === "function") {
this.initListener = listener
if (this.isConnected) {
listener()
}
} else {
console.warn("侦听器不是函数")
}
}
addMessageListener(listener: QqqqWsMessageListener) {
if (typeof listener === "function") {
this.messageListeners.push(listener)
} else {
console.warn("侦听器不是函数")
}
}
removeMessageListener(listener: QqqqWsMessageListener) {
const index = this.messageListeners.findIndex((value) => value === listener)
if (index > -1) {
this.messageListeners.splice(index, 1)
} else {
console.warn("未找到侦听器", listener)
}
}
// 关闭
close() {
this.userClosed = true
if (this.websocket) {
this.websocket.close()
}
}
// 发送
sendMessage(data: string | ArrayBufferLike | Blob | ArrayBufferView) {
console.log("发送的数据", data)
if (data && this.websocket && this.websocket.readyState === WebSocket.OPEN) {
this.websocket.send(data)
}
}
// 创建websocket
private creatWebSocket() {
console.log("websocket==================")
// 判断当前浏览器是否支持WebSocket
if ("WebSocket" in window) {
console.log("当前浏览器支持 WebSocket")
} else {
console.log("当前浏览器不支持 WebSocket")
}
try {
this.initWebSocket()
} catch (e) {
console.log("尝试创建连接失败")
// 如果无法连接上 webSocket 那么重新连接!可能会因为服务器重新部署,或者短暂断网等导致无法创建连接
this.reConnect()
}
}
// 定义重连函数
private reConnect() {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this
console.log("尝试重新连接")
if (this.isConnected || this.userClosed) return // 如果已经连上就不在重连了
this.reConnectTimeoutId && clearTimeout(this.reConnectTimeoutId)
this.reConnectTimeoutId = setTimeout(() => self.initWebSocket(), 5000)
}
// 初始化websocket
private initWebSocket() {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this
this.websocket = new WebSocket(this.url)
this.websocket.onopen = (e: Event) => {
console.log("websocket.onopen", e)
// 发送心跳包
self.heartInterval = setInterval(() => self.sendHeartbeat(), 30000)
self.isConnected = true
if (this.initListener) {
this.initListener()
}
}
this.websocket.onmessage = (e: MessageEvent) => {
// 回传给页面
self.messageListeners.forEach((value) => value(e.data))
}
this.websocket.onerror = (e: Event) => {
console.log("websocket.onerror", e)
self.handleConnectionError()
}
this.websocket.onclose = (e: Event) => {
console.log("websocket.onclose", e)
self.heartInterval && clearInterval(this.heartInterval)
self.isConnected = false
self.websocket = undefined
if (!self.userClosed) {
self.reConnect()
}
}
}
// 发送心跳包
private sendHeartbeat() {
this.sendMessage(
JSON.stringify({
heartBeatCmd: {
cmdId: 997
}
})
)
}
// 处理连接错误
private handleConnectionError() {
this.heartInterval && clearInterval(this.heartInterval)
this.isConnected = false
this.reConnect()
}
}
二、store
应用启动时调用useIotStore().initialize()初始化,这里是因为订阅了一个全局的数据
import { defineStore } from "pinia"
import { ref } from "vue"
import QqqqWs from "@/utils/QqqqWs"
import { jsonStrToObj } from "@/utils/jsonUtils"
export const useIotStore = defineStore("iotStore", () => {
// ws实例类
const qqqqWs = new QqqqWs(`ws://${location.host}/ws`)
// 常驻内存的数据
const xxxData = ref<any>({})
// 初始化方法
const initialize = () => {
// 连接成功回调
qqqqWs.addConnectedListener(() => {
// 订阅xxx数据
qqqqWs.sendMessage(
JSON.stringify({
connectorStatusCmd: {
cmdId: 996
}
})
)
})
// 注册消息侦听器
qqqqWs.addMessageListener((data) => {
if (typeof data === "string") {
const obj: any = jsonStrToObj(data as string)
if (obj?.cmdId === 996 && obj?.data) {
xxxData.value = obj.data
}
}
})
}
return {
qqqqWs,
xxxData,
initialize
}
})
三、hooks(vue3)
import { QqqqWsMessageListener } from "@/utils/QqqqWs"
import { onMounted, onUnmounted } from "vue"
import { useIotStore } from "@/store/modules/iot"
export function useQqqqWebSockets(listener: QqqqWsMessageListener) {
let lastxxxxId = ""
onMounted(() => {
// 注册消息侦听器
useIotStore().qqqqWs.addMessageListener(listener)
})
/**
* 发送业务订阅相关消息cmd
* @param xxxxId 消息内容
* @returns cmdId
*/
const sendMessage = (xxxxId: string): number => {
const cmd = {
tskvCmds: [
{
cmdId: 1,
unsubscribe: false,
entityId: xxxxId
}
]
}
if (lastxxxxId) {
cmd.tskvCmds.push({
cmdId: 998,
unsubscribe: true,
entityId: lastxxxxId
})
}
// 发送消息
useIotStore().qqqqWs.sendMessage(JSON.stringify(cmd))
lastxxxxId = xxxxId
return 1
}
onUnmounted(() => {
// 取消业务订阅
if (lastxxxxId) {
useIotStore().qqqqWs.sendMessage(
JSON.stringify({
tskvCmds: [
{
cmdId: 998,
unsubscribe: false,
entityId: lastxxxxId
}
]
})
)
}
// 取消注册消息侦听器
useIotStore().qqqqWs.removeMessageListener(listener)
})
return { sendMessage }
}
四、页面使用
// allData 存储全部数据
let wsCmdId = 1
const { sendMessage } = useQqqqWebSockets((res) => {
// 收到消息
if (typeof res === "string") {
const obj: any = jsonStrToObj(res as string)
if (obj?.cmdId === wsCmdId && obj?.data) {
const t = allData.value
Object.keys(obj.data).forEach((key) => {
if (!t[key]) {
t[key] = []
}
// 合并数据
t[key] = obj.data[key].reduce((acc: any, item: any) => {
const existingItemIndex = acc.findIndex((i: any) => i.id === item.id)
if (existingItemIndex !== -1) {
// 如果元素已存在,则更新它
acc.splice(existingItemIndex, 1, { ...acc[existingItemIndex], ...item })
} else {
// 否则,添加新元素
acc.push(item)
}
return acc
}, t[key])
})
allData.value = t
}
}
})
watch(
() => xxxxId,
(val: string) => {
if (val) {
// 发送业务消息
wsCmdId = sendMessage(val)
}
}
)
转载自:https://juejin.cn/post/7367335167395250215