likes
comments
collection
share

Vue3集成mqtt支持动态切换ws和wss协议

作者站长头像
站长
· 阅读数 47

继上一篇vue2集成mqtt之后,我们的需求又进行了升级,需要使用vue3和pinia来集成mqtt,并且支持开发环境使用ws协议,而生产环境使用wss协议。

具体思路如下:

  • 在pinia中增加mqtt模块,统一管理前端的mqtt客户端连接、主题订阅以及业务回调
  • 提供mqtt-subscribe-client组件,在mounted方法中,订阅传入的主题
  • app启动时,调用客户端的连接方法,创建mqtt客户端实例
  • 开启emqx服务器的wss端口监听,并配置证书
  • 在服务端配置nginx代理,将前端的wss协议报文,转发给emqx服务器
  • 在index.html页面增加CSP(Content-Security-Policy)标签,并在环境变量中配置CSP的值

环境准备

# 前端
"vite": "5.0.4",
"vue": "3.3.9",
"pinia": "2.1.7",
"mqtt.js": "5.6.0"

# 服务端
nginx
ssl证书

话不多说,上代码

mqtt前端代码

mqtt.js

/store/modules目录下创建mqtt.js文件

import { guid } from '@/utils/common'
import mqtt from "mqtt"

const useMqttStore = defineStore(
  'mqtt',
  {
    state: () => ({
      url: undefined,
      client: undefined,
      // 订阅主题的集合,key为topic, value为接收到该topic时需要执行的回调
      subscribeMembers: {}
    }),
    actions: {
      // 创建mqtt连接
      connect(url) {
        let client = mqtt.connect(url, clientOptions)
        client.on("connect", onConnect)
        client.on("reconnect", onReconnect)
        client.on("error", onError)
        client.on("message", onMessage)
        this.client = client
        this.url = url
      },

      disconnect() {
        this.client.end()
        this.client = undefined
        this.subscribeMembers = {}
        console.log(`服务器已断开连接!`)
      },

      /**
       *  订阅
       * @param topic            消息主题
       * @param subscribeOption  订阅设置
       * @param callback         接收消息的回调
       */
      subscribe({topic, callback, subscribeOption}) {
        if (!this.client && this.url) {
          this.connect(this.url)
        }
        this.client.subscribe(topic,
          subscribeOption || {qos: clientOptions.qos},
          (error, granted) => {
            if (error) {
              console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败: `,
                error)
            } else {
              console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功`)
            }
          })
        this.subscribeMembers[topic] = callback
      },

      /**
       *  取消订阅
       * @param topic  消息主题
       */
      unsubscribe(topic) {
        if (!this.client) {
          return
        }
        this.client.unsubscribe(topic,
          {},
          (error, granted) => {
            if (error) {
              console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `,
                error)
            } else {
              console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`)
            }
          })
        this.subscribeMembers[topic] = undefined
      },

      publish({ topic, message }) {
        if ((!this.client || !this.client.connected) && this.url) {
          this.connect(this.url)
        }
        this.client.publish(topic, message, {qos: clientOptions.qos}, (e) => {
          if(e) {
            console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e)
          }
        })
      }
    }
  })


const clientOptions = {
  clean: true, // true: 清除会话, false: 保留会话
  connectTimeout: 4000, // 超时时间
  keepAlive:60,
  clientId: 'pc_mqtt_client',
  // 认证信息
  username: 'yourUserName',
  password: 'yourPassword',
  // 开启log后,控制台会答应mqtt的调试日志
  // log: console.log.bind(console),
  qos: 1,
  protocolVersion: 5,
  rejectUnauthorized: false
}

const onConnect = (e) => {
  console.log(`客户端: ${clientOptions.clientId}, 连接emqx服务器成功:`, e)
}
const onReconnect = (error) => {
  console.log(`客户端: ${clientOptions.clientId}, 正在重连:`, error)
}
const onError = (error) => {
  console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error)
}
const onMessage = (topic, message) => {
  console.log(`客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `, message?.toString())
  let callback = useMqttStore().subscribeMembers?.[topic]
  callback?.(topic, message?.toString())
}

export default useMqttStore

订阅组件封装

订阅组件与上一篇vue2集成mqtt中的基本一致

<template>
  <div></div>
</template>

<script>
import useUserStore from '@/store/modules/user'
import useMqttStore from '@/store/modules/mqtt'

export default {
  name: 'MqttSubscribeClient',
  props: {
    topic: {
      type: String,
      required: true
    },
    global: {
      type: Boolean,
      default: false
    }
  },
  setup() {
    const mqttStore = useMqttStore()
    const userStore = useUserStore()
    return {
      mqttStore,
      userStore
    }
  },
  computed: {
    subscribeTopic() {
      return this.global ? this.topic : `${this.topic}/${this.userStore.name}`
    }
  },
  mounted() {
    this.mqttStore.subscribe({topic: this.subscribeTopic, callback: this.subscribeCallback})
  },
  beforeDestroy() {
    this.mqttStore.unsubscribe(this.subscribeTopic)
  },
  data() {
    return {

    }
  },
  methods: {
    subscribeCallback(topic, message) {
      this.$emit("receive", topic, message ? JSON.parse(message) : message)
    }
  }
}
</script>

创建连接

App.vue中,即程序的入口中,统一管理mqtt连接的创建和销毁。


<script setup>
import useMqttStore from '@/store/modules/mqtt'

// 创建mqtt连接
const mqttStore = useMqttStore()
mqttStore.connect(import.meta.env.VITE_BASE_EMQX_SERVER_HOST)

onUnmounted(() => {
  mqttStore.disconnect()
})

</script>

由于使用VITE打包,所以之前的配置VUE_APP_BASE_EMQX_SERVER_HOST需要改成VITE_BASE_EMQX_SERVER_HOST,至于为什么需要改成VITE_开头的配置,别问我为什么知道的😂,自行百度。

使用订阅组件

<template>
    <mqtt-subscribe-client :topic="'yourTopic'" @receive="receive"
    />
</template>


<script>
receive(topic, message) {
  // 处理mqtt消息
}
</script>

至此,vue3集成mqtt客户端就完成了。

emqx服务器配置

emqx服务器的配置中需要增加wss协议的配置

listeners {
	tcp.default {
		enabled = true
		bind = "0.0.0.0:63081"
	}
	ssl.default {
		enabled = true
		bind = "0.0.0.0:63082"
		ssl_options {
			keyfile = "etc/certs/your.key"
			certfile = "etc/certs/your.pem"
			cacertfile = "etc/certs/your_root.crt"
			# 开启对端验证
			verify = verify_peer
			# 强制开启双向认证,如果客户端无法提供证书,则 SSL/TLS 连接将被拒绝
			fail_if_no_peer_cert = true
		}
	}
	ws.default {
		enabled = true
		bind = "0.0.0.0:63084"
	}
	wss.default {
		enabled = true
		bind = "0.0.0.0:63085"
		ssl_options {
			keyfile = "etc/certs/your.key"
			certfile = "etc/certs/your.pem"
			cacertfile = "etc/certs/your_root.crt"
			# 开启对端验证
			verify = verify_peer
			# 强制开启双向认证,如果客户端无法提供证书,则 SSL/TLS 连接将被拒绝
			fail_if_no_peer_cert = false
		}
	}
}

配置中需要注意的有以下几点:

  • 端口号:注意各个协议绑定的端口号要不一致,否则启动会有问题
  • 证书信息:key为密钥文件,certfile为签发的证书文件,cacaertfile为根证书,证书绑定的ip或者域名需要与emqx部署的服务器一致。
  • verify:是否开启对端验证,verify_peer为开启,verify_none为不开启
  • fail_if_no_peer_cert:true则客户端必须提供证书信息,false则客户端无需提供证书,此处ssl协议和wss协议分别配置成不同的值,方便测试

nginx配置

现代浏览器如果开启了https协议,那么都会强制要求开启wss协议。不过庆幸的是,他们都可以通过nginx代理,方便的进行证书配置。

# 后端服务转发
server {
       listen       60081 ssl;
	   server_name  your.domain.com;
	   ssl_certificate path/to/your.pem;
       ssl_certificate_key path/to/your.key;
       ssl_session_timeout 5m;
       ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3;
       ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE;
       ssl_prefer_server_ciphers on;
       root  /path/to/your/dist;
       index  index.html index.htm;
       # 文件上传405问题处理
       error_page  405     =200 $uri;
       location / {
	      add_header Access-Control-Allow-Origin *;
          add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS';
          add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';

          if ($request_method = 'OPTIONS') {
            return 204;
          }
       # 此处的 @router 实际上是引用下面的转发,否则在 Vue 路由刷新时可能会抛出 404
          try_files $uri $uri/ @router;
          # 请求指向的首页
          index index.html;
       }
       # 由于路由的资源不一定是真实的路径,无法找到具体文件
       # 所以需要将请求重写到 index.html 中,然后交给真正的 Vue 路由处理请求资源
       location @router {
          rewrite ^.*$ /index.html last;
       }
       location /prod-api/ {
            proxy_pass https://localhost:18080/;
            proxy_set_header x-forwarded-for $remote_addr;
       }
    }
    
    
	# mqtt wss协议转发,前端使用63083端口,nginx转发到emqx服务器的wss协议绑定的63085端口
	server {
       listen       63083 ssl;
	    server_name  your.domain.com;
	   ssl_certificate path/to/your.pem;
       ssl_certificate_key path/to/your.key;
       ssl_session_timeout 5m;
       ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3;
       ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE;
       ssl_prefer_server_ciphers on;
	   location /mqtt {
            proxy_pass https://localhost:63085; 
			proxy_set_header   Host             $host;
			proxy_set_header   X-Real-IP        $remote_addr;
			proxy_set_header   X-Forwarded-For  $remote_addr;
			
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
			# 默认60s断开连接
			proxy_read_timeout 60s;
        }
	}

配置中需要注意的是:

  • listen 端口后面需要增加ssl
  • server_name:需要使用与证书绑定的ip或者域名
  • mqtt的wss转发配置监听63083端口,转发到emqx服务器wss协议监听的63085端口,proxy_pass需要使用https协议。

会有人问为什么需要使用nginx来转发,而不是使用前端直连emqx的63085端口,个人猜测是因为我们的客户端是浏览器,mqtt.js库对于浏览器做了特殊处理,丢弃了证书相关的配置内容。 其实也很好理解,如果浏览器端直连,那么首先用户需要将证书下载到本地,并且每次连接时都需要从本地系统读取证书,让浏览器有自动读取本地文件的权限,很显然是不被允许的。所以只能在服务端做这件事情。nginx就是很好的实现手段。

前端动态配置协议

进入下一个环节,根据环境变量动态切换ws和wss协议。ws和wss的关系类似于http和https。前端开启安全协议的方式是在根目录index.html文件中增加meta信息:

<meta http-equiv="Content-Security-Policy" content="upgrade-insecure-requests;" />

为了不影响开发环境,所以我们通过加载环境变量的方式,动态切换是否开启安全协议。

首先在env文件中增加相应的配置

env.development

# 开发环境配置
ENV = 'development'
VITE_BASE_EMQX_SERVER_HOST='ws://your.domain.com:63084/mqtt'
VITE_BASE_CSP = ""

env.production

# 开发环境配置
ENV = 'production'
VITE_BASE_EMQX_SERVER_HOST='wss://your.domain.com:63083/mqtt'
VITE_BASE_CSP = "upgrade-insecure-requests;"

注意2个文件中的区别:

  • 首先是协议:dev环境使用ws,prod环境使用wss
  • 其次是端口号,dev使用emqx服务器ws协议绑定的端口号63084,但prod使用的并不是emqx服务器wss协议绑定的端口号63085,因为需要通过nginx转发来携带证书信息,此处是重点,重点,重点!!!(手动加粗)

然后将index.html中的CSP meta信息修改一下:

<meta http-equiv="Content-Security-Policy" content="%VITE_BASE_CSP%" />

至此,大功告成。

总结

经历了几天的折腾,总算把整个流程打通,网上关于mqtt over wss的文章很少,有的也是只言片语,mqtt.js源码中提供的示例也是让人摸不着头脑,官方文档也没有解释为什么浏览器端无法传递证书信息(可能是觉得这是基础知识,或者觉得这是服务端的事,关我一个前端库屁事)。写下这篇文章,希望更多的人在使用mqtt时,可以少踩一些坑。

转载自:https://juejin.cn/post/7369150287654338586
评论
请登录