likes
comments
collection
share

RocketMQ事务消息-解密事务消息实现原理(下篇)

作者站长头像
站长
· 阅读数 13

往期回顾

RocketMQ事务消息-解密事务消息实现原理(上篇)

基于V4.9.4版本的RocketMQ源码解析专栏

RocketMQ实战专栏

本期重点内容

我们在上一篇文章中详细的解析了事务消息的发送的过程,包含了客户端的发送流程以及服务端的处理流程。本篇文章将会详细的解析一下,服务端是如何处理那些在指定的事务超时时间内没有处理完成的预消息的,也就是服务端是如何回查客户端的事务状态。

事务消息总体脉络

RocketMQ事务消息-解密事务消息实现原理(下篇)

我们在上一篇文章,也就是RocketMQ事务消息-解密事务消息实现原理(上篇) 中,重点梳理了图示的:

1、事务开始,写入预消息

2、执行本地事务逻辑

3、结束事务,预消息最终的处理结果

本篇文章,我们重点梳理一下 TransactionalMessageCheckService 回查事务执行状态的流程。

重点代码解析

服务端TransactionalMessageCheckService代码解析

TransactionalMessageCheckService是一个线程服务类,每隔60S去执行一次onWaitEnd方法的逻辑。onWaitEnd方法是事务状态校验的入口方法,定义了客户端事务执行的超时时间是6S,事务执行最大校验次数是15次。

RocketMQ事务消息-解密事务消息实现原理(下篇)

定时执行TransactionalMessageService的check方法,check方法主要内容可以分为三个部分:

1、根据 half_topic以及op_half_topic关联队列的消息进度,获取已经完成的预消息或者本次要处理的已经完成的预消息。

2、去处理已经完成的预消息并且获取未处理的预消息已经判断是否需要回查客户端。

3、更新half_topic以及op_half_topic关联队列的消息进度。

这个代码片段就是第一部分,获取已经完成的预消息以及本次将要处理的已经完成的预消息 RocketMQ事务消息-解密事务消息实现原理(下篇)

fillOpRemoveMap方法就是填充 doneOpOffset 以及 removeMap这两个数据结构,为后续的处理做准备 RocketMQ事务消息-解密事务消息实现原理(下篇)

第二部分一方面是处理已经完成的预消息,另一方面就是判断当前的这条消息是否应该执行回查或者丢弃。

在回查事务消息之前,需要将此条消息重新写入到half_topic中,为下一次回查校验做准备。

RocketMQ事务消息-解密事务消息实现原理(下篇)

调用listener.resolveHalfMsg(msgExt)方法进行消息的回查,放入独立的线程池中执行。 RocketMQ事务消息-解密事务消息实现原理(下篇)

第三部分就是更新half_topic以及op_half_topic关联队列的消费进度,说明此消费进度之前的消息已经处理完成。

RocketMQ事务消息-解密事务消息实现原理(下篇)

客户端checkTransactionState代码解析

服务端调用客户端事务状态回查的入口方法

RocketMQ事务消息-解密事务消息实现原理(下篇)

根据该条消息所属的生产者组名获取对应的生产者实例,调用producer.checkTransactionState(addr, messageExt, requestHeader) 回查本地事务状态

RocketMQ事务消息-解密事务消息实现原理(下篇)

客户端回查事务执行结果的核心方法。主要分为两个部分,一部分就是调用生产者实现的事务监听器的指定方法 transactionListener.checkLocalTransaction来回查本地事务执行的结果,另一部分就是将查到的结果构建服务端请求头,来向服务端发起请求,告知服务端事务执行结果。

RocketMQ事务消息-解密事务消息实现原理(下篇)

本期总结

通过上下两篇,我们以图示和源码解析的方式,梳理了一遍RocketMQ事务消息的实现原理,希望通过这两篇文章,能够帮助广大开发者更好的理解以及使用RocketMQ事务消息。