likes
comments
collection
share

微信小程序使用mqtt之避坑指南第二篇

作者站长头像
站长
· 阅读数 39

在笔者之前的文章《微信小程序使用mqtt之详细的避坑指南》中曾介绍了如何在微信小程序中使用mqtt和服务端建立连接,其中详细介绍了如何下载mqtt压缩文件并引入到本地,在连接服务端时需要注意小程序后台需要配置服务端域名以及连接url的协议等。本文接续上文,继续和大家分享在使用mqtt过程中遇到的三个问题,第一个是在消息发布的时候mqtt连接就断掉,另二个是如何在mqtt连接已经断掉的情况下发布消息,第三个是如何解决多页面连接mqtt冲突的问题。

发布消息导致mqtt断连

如果像如下代码这样直接发送经protobuf的encode处理后的消息(Unit8Array格式):

const pushMsgBuffer:any = protoRoot['PushMessage'].encode(pushMsg).finish();
client.publish(pubMessageTopic, pushMsgBuffer, { qos: 1 }, (err: any) => {
  if (!err) {
    console.log('发布成功');
  }
});

我们会发现,程序不会报错,但是也不会发送成功。经服务端的大佬帮忙调查,发现只要客户端一发布消息,那么客服端和服务端的mqtt就会断开连接。

笔者纳闷了好久,为什么一调用publish就出问题呢?于是大胆猜测:那必然是格式不对,导致服务端主动断开连接的吧?一翻搜索后查找到文末的参考资料【1】,于是npm install buffer安装了一下 Buffer(不了解如何使用在小程序中使用npm包的同学,可以阅读笔者的文章《格局打开——微信小程序使用npm》),把要发送的数据转换一下就成功了,如下图所示:

微信小程序使用mqtt之避坑指南第二篇

mqtt断连情况下发布消息

笔者已经写了mqtt对close事件监听的相关代码,但是发现总报下图所示的错误,还不触发close事件或者error事件:

微信小程序使用mqtt之避坑指南第二篇

这个错误一发生mqtt连接就断了,但是由于这个错误也不触发mqtt可以监听的那些事件,所以就会导致用户在不知道连接已经断的情况下发送消息从而导致发送的消息不能成功送达。

如果你的mqtt连接设置了自动重连的化,那么在下次连接成功后,由于这个错误导致发送失败消息也会被发送成功,但是这样并不能保证时效性。

在开发者社区中搜索到很多人都遇到此问题(参见:closeSocket:fail task not found错误该怎么解决?uni-app使用mqtt连接不上closeSocket:fail task not found?),但是并没有什么好的解决方案。笔者经过一番绞尽脑汁后,想到了一个“曲线救国”的方案:、

(1)设置一个变量 count,并且赋予其初始值为0;

(2)在调用client的publish方法之前先将count加1,并在publish的回调函数中将count减1。

(3)在调用client的publish方法之后设置一个倒计时setTimeout,并在其回调中检查count是否为0 。

(4)如果count当前值不为0则说明发生了错误导致publish方法的调用没有成功。

(5)将调用失败的方法放到待调用的任务队列中,并手动进行mqtt的连接

(6)在mqtt的监听的connet事件中检查任务对队列中是否有待调用任务,如果有则循环取出并调用

代码如下:

sendMessage() {
  this.globalData.pubSub.on('sendData', (param: any) => {
    const that = this
    const publishFunc = () => {
      const buffer = new Buffer(param[0])
      this.globalData.client.publish(that.globalData.pubMessageTopic, buffer, { qos: 1 }, (err: any) => {
        this.globalData.count--;
        if (!err) {
          console.log('发布成功');
        }
      });
    }
    this.globalData.count++;
    setTimeout(async () => {
      if (this.globalData.count) {
        this.globalData.waitToInvoke.push(publishFunc)
        this.connnectMqtt();
        this.mqttOnConnect();
        this.mqttOnReconnect()
        this.mqttSubscribe()
        this.mqttOnMessage()
      }
    }, 2000)
    publishFunc();
  })
}
mqttOnConnect() {
  this.globalData.client.on('connect', () => {
    while(this.globalData.waitToInvoke.length) {
      const func = this.globalData.waitToInvoke.shift()
      func()
    }
  });
},

当然我们刚才说过,如果你的mqtt连接设置了自动重连的化,那么在下次连接成功后,由于这个错误导致发送失败消息也会被发送成功。也就是说发生了“closeSocket:fail task not found”的这个错误后我们发送的消息会被mqtt发送两次。解决这个问题需要服务端配合一下,例如可以给每一个消息设置一个消息id,如果服务端收到相同消息id的消息则不做处理。

多页面连接mqtt冲突

笔者开发时遇到的场景是小程序的多个页面都需要使用mqtt连接。为了保证能够成功收到消息,同一时刻小程序应该只有一个mqtt实例。笔者一开始想到的方法是在当前某个页面如果建立了mqtt连接,那么在页面跳转的时候则使用client的end方法来关闭当前连接,然后在新页面再建立一个新的连接。然而经测试,这样并不能保证上一个页面的mqtt连接被成功关闭。查看了开发者社区的提问,也有人遇到同样的问题(参见:微信小程序不同页面使用mqtt报错如何解决?小程序MQTT 多个页面需要数据的情况下应该如何使用? )。容易想到的是我们可以在全局就使用一个mqtt连接,像发送和接收消息都可以放在全局的app.js(或者app.ts)中,然后其他页面可以和全局进行通讯(例如使用发布订阅模式)。代码参照如下:

// app.ts
import PubSub from './utils/pubsub'

App({
  globalData: {
    client: <any>null,
    pubSub: new PubSub(),
  },
  onLaunch() {
    this.globalData.pubSub.on('needConnect', (args: any) => {
      this.handleMqtt()
      // 上文已经贴出来了
      this.sendMessage()
    })
  },
  handleMqtt() {
    this.connnectMqtt()
    this.mqttOnConnect()
    this.mqttOnReconnect()
    this.mqttSubscribe()
    this.mqttOnMessage()
  },
  ttOnMessage() {
    this.globalData.client.on('message', (topic: any, message: any) => {
      const pages = getCurrentPages()
      const route = pages[pages.length - 1]['__route__']
      // 消息转发到不同页面
      if (route === 'pages/index/index') {
        this.globalData.pubSub.emit('indexNotice', message)
      } else if (route === 'pages/service/service') {
        this.globalData.pubSub.emit('serviceNotice', message)
      } else if (route === 'pages/serviceDetail/serviceDetail') {
        this.globalData.pubSub.emit('serviceDetailNotice', message)
      }
    });
  },
})
// 其他页面
var app = getApp()
const pubSub = app.globalData.pubSub

const param = encodeMessage(url, 3, [{
  clientKey: this.data.currentFriendInfo.clientKey,
  userId: this.data.currentFriendInfo.userId
}])
pubSub.emit('sendData', param)
handleMessage() {
  pubSub.on('serviceDetailNotice',(param:any) => {
    // 做点儿事情
  })
},

总结

本文详细介绍了微信小程序中使用mqtt时遇到的三个问题以及解决方法:

第一个问题是在消息发布的时候mqtt连接就断掉,解决办法是需要将使用的数据格式从Unit8Array改为buffer, 需要引入Node.js的buffer模块。

第二个问题是如何在mqtt连接已经断掉且无法检测的情况下发布消息,笔者采用了一个将全局变量和定时器结合的办法来解决这个问题。

第三个问题是如何解决多页面连接mqtt冲突的问题,解决办法是全局就使用一个mqtt连接,mqtt发送和接收消息的代码都写在app.ts(js)文件中,其他页面和app.ts使用发布订阅模式进行通讯。

参考资料

【1】微信小程序调用MQTT发布16进制HEX,发送buff和Uint8Array断开

【2】Nodejs中的buffer模块怎么使用