likes
comments
collection
share

RocketMQ消息消费-客户端为拉取消息所做的努力

作者站长头像
站长
· 阅读数 77

“我报名参加金石计划1期挑战——瓜分10万奖池,这是我的第2篇文章,点击查看活动详情

往期回顾

RocketMQ源码解析-消息是如何写入Broker服务器(客户端篇)

RocketMQ源码解析-消息是如何写入Broker服务器(Broker端篇)

RocketMQ源码解析-步步拆解ConsumeQueue的写入流程

RocketMQ消息写入-步步拆解IndexFile的写入流程

RocketMQ消息消费-客户端拉取消息前的准备工作

在上一篇章中,我们介绍了RocketMQ客户端在拉取消息前的准备工作。准备工作主要是在DefaultMQPushConsumerImpl中来完成的,这个类伴随着DefaultMQPushConsumer的创建而创建,代表者消费者组中的一个消费客户端。准备工作完成后,接下来我们分析一下RocketMQ客户端如何拉取消息以及在拉取消息的过程中,客户端付出了怎样的努力。

本期重点脉络梳理

本篇文件我们重点解析的 RebalanceService、PullMessageService、RebalanceImpl这几个核心类。这几个类相互配合、紧密联系,建立起了客户端拉取消息的基础。

客户端核心类交互流程图(虚线表示调用关系)

RocketMQ消息消费-客户端为拉取消息所做的努力

1.本示例图的开始流程由RebalanceService触发,默认20S会去触发一次RebalanceImpl这个类的doRebalance方法来进行一次针对订阅Topic主题的队列负载

2.RebalanceImpl的doRebalance方法会根据指定的负载策略以及消费者组内客户端的个数进行队列的分配。若本次分配的结果少于既往队列,则将少的那部分队列在本客户端取消拉取请求。若本次负载队列后有新的拉取请求需要添加,则RebalanceImpl会去调用PullMessageService提供的方法executePullRequestImmediately,将新的拉取请求放入PullMessageService的队列中。

3.PullMessageService继续调用DefaultMQPushConsumerImpl的消息拉取方法。

4.DefaultMQPushConsumerImpl调用pullAPIWrapper提供的pullKernelImpl方法进行消息的拉取。

5.pullAPIWrapper提供的pullKernelImpl方法调用底层交互API从Broker端获取要拉取的消息。

6.DefaultMQPushConsumerImpl调用用户注册的consumeMessageService进行消息的消费。

7.消息消费后的偏移量存储在OffsetStore中。

涉及源码解析

我们针对上面的流程来进行源码的解析,源码解析的顺序跟上面的流程是一致的,方便对照研究。

RebalanceService默认每个20S触发一次mqClientFactory.doRebalance(),本质上还是触发的RebalanceImpl这个类的doRebalance方法

RocketMQ消息消费-客户端为拉取消息所做的努力

RocketMQ消息消费-客户端为拉取消息所做的努力

RocketMQ消息消费-客户端为拉取消息所做的努力

RebalanceImpl这个类的doRebalance方法针对Topic进行重新负载分配队列

RocketMQ消息消费-客户端为拉取消息所做的努力

我们只需关注集群模式下的处理。

rebalanceByTopic的核心逻辑就是按照指定的分配队列策略以及消费组内客户端的个数进行队列的分配,将分配后的结果调用updateProcessQueueTableInRebalance进行处理 RocketMQ消息消费-客户端为拉取消息所做的努力

updateProcessQueueTableInRebalance的核心处理逻辑也就是我们上面说的将那些分配少的队列进行取消,不在进行本客户端的拉取。将新分配的队列建立ProcessQueue和MessageQueue的映射关系,然后新建拉取请求,调用dispatchPullRequest方法,放入PullMessageService队列中。 RocketMQ消息消费-客户端为拉取消息所做的努力

dispatchPullRequest方法最终调用逻辑。 RocketMQ消息消费-客户端为拉取消息所做的努力

PullMessageService线程不断的从pullRequestQueue队列中取出待拉取的消息,调用pullMessage方法进行处理,pullMessage方法内部本质上面还是调用DefaultMQPushConsumerImpl的pullMessage。

RocketMQ消息消费-客户端为拉取消息所做的努力

DefaultMQPushConsumerImpl的pullMessage方法篇幅较长,其实主要分为了三部分:

第一部分是校验部分,针对待拉取队列以及服务状态的校验。

第二部分构建了PullCallback回调对象,这个对象是针对本次消息拉取后的回调处理。

第三部分调用pullAPIWrapper类的pullKernelImpl方法进行拉取消息。

RocketMQ消息消费-客户端为拉取消息所做的努力

pullAPIWrapper类的pullKernelImpl方法主要是构建与远端Broker交互的数据,然后调用MQClientAPIImpl的pullMesssage方法向Broker发起请求拉取消息。 RocketMQ消息消费-客户端为拉取消息所做的努力

本篇总结

本篇文件介绍了RocketMQ消息拉取在客户端的处理流程,结合类图以及源码解析能够更加深入的熟悉这一流程,方便大家在工作与学习中遇到问题时能够从原理出发解决问题,更加熟练的运用这一款消息中间件。

接下来,我们将会梳理一下服务端Broker如何去处理客户端拉取消息的请求以及客户端在收到拉取下来的消息后是如何处理的,敬请期待,谢谢。

转载自:https://juejin.cn/post/7142094849389854728
评论
请登录