likes
comments
collection
share

Spring Boot MQTT Too many publishes in progress错误的解决方案

作者站长头像
站长
· 阅读数 32

背景

最近项目中需要与andorid端进行交互,采用了MQTT消息进行通信,生产环境中偶尔会出现Too many publishesin progress(32202)的错误,严重的影响了正常功能的使用。

前言

原因分析

出现此问题的原因跟MQTT的Qos的设置有关,所以需要简单的介绍下Qos相关值的含义

-0 最多一次的传输

发布者发送消息到服务器,没有确认消息,也不知道对方是否收到。

  • 1 至少一次的传输    发布者发布消息保存消息,服务器(broker)接收到消息,服务器(broker)发送消息到订阅者,服务器(broker)回一个PUBACK信息到发布者让删除消息,然后订阅者接收消息后PUBACK给服务器让删除消息。如果失败了,在一段时间确认信息没有收到,发送方都会将消息头的DUP设置为1,然后再次发送消息,消息最少一次到达服务。例如网络延迟等问题,发布者重复发送消息,订阅者多次订阅会产生重复消息.

  • 2 只有一次的传输

  Qos为2只是在1的基础上做了改进,在发布者发送到服务器之后多了消息的确认以及多了消息msgID的缓存,重复信息的去重。在服务器发送到订阅者之后也多了消息的确认。

项目中使用了MQTT发送消息的地方比较多,且一般都是以Qos为0,那么为什么Qos为0,在并发量比较大的情况下就会出现Too many publishesin progress(32202)的错误,报错的内容的源码如下:

Spring Boot MQTT Too many publishes in progress错误的解决方案

当actualInFlight超出设置的maxInflight最大值时就会报此错误,那么具体是什么原因导致的呢?我们需要通过源码来分析此问题的原因。

源码分析

关于源码的阅读我们需要整理主线思路,MQTT发送消息主线分为消息push到缓存中和异步发送两部分。

MQTT的Push消息到缓存中时序图

Spring Boot MQTT Too many publishes in progress错误的解决方案

MqttPahoMessageHandler的publish方法

Spring Boot MQTT Too many publishes in progress错误的解决方案 说明:checkConection检查连接后,在发送消息。

MqttAsyncClient的publish方法

Spring Boot MQTT Too many publishes in progress错误的解决方案

ClientComms的internalSend方法

Spring Boot MQTT Too many publishes in progress错误的解决方案

ClientState的send方法

Spring Boot MQTT Too many publishes in progress错误的解决方案

MqttPublish消息类型,继承了父类MqttWireMessage,而在MqttWireMessage的构造方法中将消息id设置为0

Spring Boot MQTT Too many publishes in progress错误的解决方案

SaveToken的源码实现如下:

Spring Boot MQTT Too many publishes in progress错误的解决方案

通过前面这几步的操作,消息已经放入到HashTable缓存中,准备异步发送。

异步发送消息时序图

Spring Boot MQTT Too many publishes in progress错误的解决方案

说明:MqttAsyncClient的connect为客户端建立连接,兴趣的可以看下源码。

ClientComms的conncect方法

Spring Boot MQTT Too many publishes in progress错误的解决方案

ConnectBG的run方法

Spring Boot MQTT Too many publishes in progress错误的解决方案

CommsSender的run方法

Spring Boot MQTT Too many publishes in progress错误的解决方案

1.从clientState中获取消息 2.通过消息id去hashtable中获取缓存消息 3.消息不为空,执行消息发送 4.调用notifySent方法删除消息,且actualInFlight执行递减操作。

CommsSender的notifySent方法

Spring Boot MQTT Too many publishes in progress错误的解决方案

小结

在高并发的场景下,pendingMessage可能会添加多条数据,Qos设置为0的时候,存入tokens(Hashtable)中的key一直是0,当执行tokenStore.getToken在发送方法之后会remove所有数据,由于tokenStore中已经不存在值,因为已经被上一次已经全部remove了,当再次getToken的消息时获取会为空,不在发送信息,使得actualInFlight没有递减,所以才经过一段时间后actualInFlight就会超出设置最大值,从而报错。

//存放待发送消息的Vector数组
volatile private Vector pendingMessages;

解决方案

方案1:发送消息时设置为Qos=1

此方案虽然可以解决此问题,但存在如下的缺点:

  1. 网络延迟时会发送重复消息问题,导致消费者重复消费,关于重复的消息解决需要进行相关的幂等性操作,增加了修改的复杂度和成本。
  2. 发送消息需要进行消息确认,网络资源消耗过大。

方案2: 修改maxInflight的默认值,例如将其修改为50

```
  MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  mqttConnectOptions.setMaxInflight(50);
```

此方案虽然修改比较简单,但是并没有从根本上解决问题,只是缓解了出现错误的时间,如果项目中并发量比较低,可以采用此方案解决。

方案3:将消息配置为多客户端模式

由于mqttMessageHandler只会引用一个paho客户端,所以才会想到增加客户端模式来提高并发量.需要重写MqttPahoMessageHandler类的相关方法。虽然可以解决此问题,如果对MQTT的源码不是很了解,不建议采用此方案,不利于后续的版本升级。

方案4:升级mqtt版本为1.2.1

在1.2.1的版本中官方已经进行了相关的修改,当qos=0已经不存入tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题。

引入1.2.1的版本会带来https验证问题,因为在Mqtt的1.2.1版本中,增加了https的验证需要添加相关配置,否则启动时会报安全认证错误。

解决方案:如果项目中没有开启https认证,需要设置HttpsHostnameVerificationEnabled为false即可。

mqttConnectOptions.setHttpsHostnameVerificationEnabled(false);

总结

本文通过定位MQTT错误,详细的讲解了MQtt消息的发送流程,解决的方案虽然有多种,我们需要结合实际的业务情况来解决问题,关于MQtt如果还有其他疑问,可以随时反馈,大家共同学习,共同进步。