likes
comments
collection
share

RocketMQ消费者消费消息核心原理(含长轮询机制)

作者站长头像
站长
· 阅读数 6

前言

在消息系统中,消费者消费消息有拉和推消息两种实现方式,拉消息消费者主动向消息服务器发送拉消息请求,消息服务器将消息返回给消费者,而推消息消息服务器主动向消费者推送消息的形式,这两种消息消费实现各有各的优势和劣势。

在RocketMQ中,有两种消费者客户端,一种是Push模式消费者, 一种是Pull模式消费者,这两个其实都是表现,在RocketMQ底层实现中采用长轮询的机制来实现消息拉取消费功能。

RocketMQ消费者消费消息核心原理(含长轮询机制)

长轮询模式兼顾了拉和推消息的优势。本文将从源码层面分析RocketMQ的消费者角色的工作原理。从整体看下org.apache.rocketmq.client.consumer.DefaultMQPushConsumer消费者实现类在消费客户端的启动流程,RocketMQ的消费者客户端即支持拉又支持推,但是底层都是基于长轮询的实现。

既然内核底层是长轮训模式,为什么RocketMQ会有Push,Pull客户端呢?

RocketMQ消费者消费消息核心原理(含长轮询机制)

Push只是在客户端,RocketMQ客户端组件将消息拉取到本地后,自动回调我们业务方的业务逻辑执行消费。

Pull模式,其实也是Rocketmq会将消息拉取到本地,我们的业务代码主动从本地读消息进行消费。

这两种客户端对我们开发人员更加友好,可以自己选择最合适的消费方式。

Push消费者启动流程

看源码前,我们可以思考下,消费者能够从broker拉取到消息主要需要做哪些事情?

RocketMQ消费者消费消息核心原理(含长轮询机制)

1、首先肯定需要和Broker创建连接,RocketMQ底层使用Netty组件通信,肯定会使用Netty和Broker建立连接。

2、和Broker建立连接之后,需要告诉Broker拉取Topic下哪个队列的消息,这里就需要用到消费者负载均衡机制了。

3、向Broker拉取消息,还有一个关键的属性,那就是消息拉取偏移量offset, 消费者需要知道下次从哪个偏移量开始拉取消息。在RocketMQ中广播消息模式是将偏移量存储在消费者本地, 集群消息模式是将消息偏移量存储在Broker。因此消费者启动时还需要加载好各个topic的偏移量

分析了上面主线逻辑之后,我们可以看看源码是如何做的。

消费者启动流程在源码 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

加载偏移量存储服务。

RocketMQ消费者消费消息核心原理(含长轮询机制)

偏移量根据消费模式有两个策略,如果是广播消息偏移量存储在本地,如果是集群消息则存储在broker

消费者调用业务逻辑执行消费逻辑

RocketMQ消费者消费消息核心原理(含长轮询机制)

消费者实例启动,里面的主逻辑非常清晰。

org.apache.rocketmq.client.impl.factory.MQClientInstance#start

public void start() throws MQClientException {

                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // 启动netty客户端,和Broker建立通信连接
                    this.mQClientAPIImpl.start();
                    // 定时一些任务
                    this.startScheduledTask();
                    // 启动拉消息的服务,通过线程异步拉取
                    this.pullMessageService.start();
                    // 启动负载均衡服务
                    this.rebalanceService.start();
                    // 启动内置消息发送服务 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    //更新消费者启动状态
                    this.serviceState = ServiceState.RUNNING;
                    
}

通过上面的步骤消费者实例就启动完成了。

我们发现,消费者客户端主要包含了几个核心组件服务

1、偏移量加载服务 => 记录消息消费进度

2、消息拉取服务 => 建立连接,拉取消息

3、负载均衡服务 => 分配消费队列

4、消息消费服务 => 回调消费消息业务逻辑

和我们一开始的想的没有很大差别。 这里体现了单一职责设计原则的设计思想,不同的类专门负责各自不同的职责。

RocketMQ消费者消费消息核心原理(含长轮询机制)

看到这里,我们就把消费者客户端启动过程的主线捋清楚了。

客户端拉取消息请求发起

我们接着看最主要的流程,拉取消息服务PullMessageService, 看看他做了哪些事情。

点进去看发现他是一个线程任务。自然我们就会想到去看他的run方法实现,因此我们可以猜测,他是通过异步拉取消息的。

RocketMQ消费者消费消息核心原理(含长轮询机制)

@Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

从阻塞队列拿拉数据请求

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

每个拉取请求里包含了对应的消费者组,消息队列,拉取偏移量,从这些信息就可以知道要拉取哪个topic下面哪个队列哪个偏移量开始的消息了。

public class PullRequest {
    private String consumerGroup;
    private MessageQueue messageQueue;
    private ProcessQueue processQueue;
    private long nextOffset;
    private boolean lockedFirst = false;

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl中会发起拉消息请求,将消费者组,topic,队列号,偏移量传给broker。

RocketMQ消费者消费消息核心原理(含长轮询机制)

我们可以注意到,上面还有两个重要的参数brokerSuspendMaxTimeMillis,timeoutMillis

这两个个暂停时间参数就和大名鼎鼎的长轮询机制有着密切的关系。

brokerSuspendMaxTimeMillis参数表示broker在没有消息拉取的时候会暂停等待多久返回给消费者客户端,

timeoutMillis则是表示消费者客户端最长等待多久broker返回消息的。

在消费者客户端,我们可以看到timeoutMillis的使用,控制了请求超时时间。

RocketMQ消费者消费消息核心原理(含长轮询机制)

看看默认的超时时间配置

brokerSuspendMaxTimeMillis 默认15s, 也就是broker会在没有消息拉取停15s,再返回客户端

timeoutMillis 默认30s,也就是消费者客户端会等待broker30s。

RocketMQ消费者消费消息核心原理(含长轮询机制)

下文我们还可以看到broker端对拉取消息的处理时对参数brokerSuspendMaxTimeMillis的应用。

消费者实例已经发起拉取消息请求了,broker把消息返回后,消费者做什么呢?

消费者需要对消息内容解码,然后将消息分发给我们写的业务代码执行。

RocketMQ消费者消费消息核心原理(含长轮询机制)

这里就会把拉取到消息,通过任务的形式投递给消息消费处理线程池,线程池把消息转交给业务类处理。

RocketMQ消费者消费消息核心原理(含长轮询机制)

我们经常会写个Listerner来监听RocketMQ消息,这种消费消息的方式,RocketMQ就是在Push模式中实现的。

消费完消息,还需要更新最新的消息消费偏移量,在这里RocketMQ会同步更新内存中的偏移量,不会直接远程调用Broker。

RocketMQ消费者消费消息核心原理(含长轮询机制)

RocketMQ消费者消费消息核心原理(含长轮询机制)

看到这几个点,RocketMQ中,拉取消息支持异步消费消息异步,还有偏移量更新到Brocker是异步,都是异步的,这种就像io线程和业务线程做了隔离的设计思想,如果要说RocketMQ性能高的原因,这个点也可以说。

看完上面的主线逻辑,我们其实已经能够说明白RocketMQ消费者端的主流程了。

下面看看Broker端处理拉取消息请求的流程

Broker端处理拉取消息

Broker端有个专门处理类处理拉取消息请求,PullMessageProcessor

RocketMQ消费者消费消息核心原理(含长轮询机制)

我们也可以想下,RocketMQ拉取消息时需要做哪些事情?

1、因为消息是存储在CommitLog中,CommitLog对应的是一个一个的文件,怎么定位到具体的文件呢?也就是怎么定位到具体的CommitLog?

2、我们订阅消息消息,可能会使用不同的tag,tag会传到Broker端,因此Broker端需要做tag过滤

RocketMQ消费者消费消息核心原理(含长轮询机制)

RocketMQ如何定位CommitLog?

在Broker端,会先根据topic和队列号定位到ConsumeQueue

RocketMQ消费者消费消息核心原理(含长轮询机制)

ConsumeQueue是Commitlog文件的索引,里面缓存了所有的Commitlog文件信息和物理偏移量信息

RocketMQ消费者消费消息核心原理(含长轮询机制)

通过ConsumeQueue就可以知道的CommitLog的物理偏移量了。

RocketMQ消费者消费消息核心原理(含长轮询机制)

消息查询前会根据tag的hash值进行过滤,这里是不是经常问消息tag是在哪过滤的? 消息的tag的会以hash值形式存储在ConsumeQueue,由于ConsumeQueue是索引,所以Broker端查询消息前会进行过滤。

RocketMQ消费者消费消息核心原理(含长轮询机制)

RocketMQ消费者消费消息核心原理(含长轮询机制)

执行完消息过滤,就可以真正的查询消息啦。查询消息就是返回了ByteBuffer

RocketMQ消费者消费消息核心原理(含长轮询机制)

到这里,消息查询出来了,但是长轮询处理在哪呢?在拉取结果处理的地方。

RocketMQ消费者消费消息核心原理(含长轮询机制)

Broker将当前请求加入等待队列,response置为null, 这样不会立即把响应写到消费者客户端。

RocketMQ消费者消费消息核心原理(含长轮询机制)

长轮询机制的核心实现马上就要揭开神秘面纱了。 Broker通过将当前请求的快照信息存储下来。

RocketMQ消费者消费消息核心原理(含长轮询机制)

每5秒检查一次每个等待的请求队列里是否有新消息,有新消息就会返回消息给消费者客户端,或者超时了也不会再等待。 RocketMQ消费者消费消息核心原理(含长轮询机制)

看到这里,Broker处理消息拉取逻辑也清晰了。

RocketMQ消费者消费消息核心原理(含长轮询机制)

总结

本文分析了RocketMQ消费消息逻辑的实现原理,我们平时写消费者逻辑应该是比较多的,消费消息本质上是我们应用程序和Broker服务进行rpc通信,交换业务请求和消息的过程。

我们发现RocketMQ在代码设计上有几个比较突出的优点,值得我们学习和借鉴

1、代码的单一职责化很明显,各个类都各司其职,专门处理不同领域的逻辑。

2、线程池的使用,将消息拉取,消息偏移量同步,消息长轮询机制等都通过线程池,异步处理,同时也把消息拉取和消息消费进行了解耦,确实非常妙。