likes
comments
collection
share

RocketMQ 源码探究 -- 动态重平衡实现

作者站长头像
站长
· 阅读数 31

简介

动态重平衡是为了匹配消费者组中消费者和 Topic 中队列之间的关系,提升消息的并行处理能力。

如下图所示,Topic A 由四个队列组成,由 Consumer Group A 中的三个消费者消费,其中 Consumer 1 负责 Queue 1 和 Queue 4 的消费。

RocketMQ 源码探究 -- 动态重平衡实现扩容前

当我们新添加一个 Consumer 4。消费关系变成下图所示,此时我们就认为发生了动态重平衡。

RocketMQ 源码探究 -- 动态重平衡实现

扩容后

when

消费者和队列的变化都会触发重平衡的操作。

消费者变化

当我们日常发布、对消费节点扩缩容时,都会触发消费者数量的变更操作。

我们以 pull 模式的实现:DefaultLitePullConsumer 为例,介绍当消费者变化时,如何触发重平衡操作。

下面是拉模式的 Consumer 使用方法。

public static void main(String[] args) throws Exception {
    DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("litepull");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.setAutoCommit(true);
    consumer.start();
    consumer.subscribe("TestPull""*");
    while (true) {
        List<MessageExt> messages = consumer.poll(1000);
        for (MessageExt message : messages) {
            System.out.println(message);
        }
        TimeUnit.SECONDS.sleep(1);
    }
}

Consumer 在启动时,会默认分配一个唯一标识(ip + pid + 当前纳秒)。当调用 consumer#subscribe 时,consumer 会立马向所有 broker 发送心跳包,包含当前 consumer 的唯一标识、所属消费者组名称以及订阅的 topic 信息。

broker 在收到心跳请求后。判断 Netty 的 channel 是否为新创建、当前 consumer 订阅的 Topic 是否有变更,两者只要有一者满足条件,则通知消费者触发重平衡操作。

DefaultConsumerIdsChangeListener#handler 处理 CHANGE 事件,向连接中的 channel 发送重平衡请求。

根据调用关系我们可以看出当连接断开、consumer 取消订阅时都会触发重平衡操作。

RocketMQ 源码探究 -- 动态重平衡实现

触发时间点

队列变化

broker 下线和 sh mqadmin updateTopic -r ${queueNumber} 命令都会触发队列数量的变化。由于 broker 中队列数据是维护到 name server 的,所以 broker 不会直接通知到 consumer。那 consumer 如何触发重平衡呢?

Consumer 在启动后会设置一个每隔 20s 触发的定时器,不断地根据消费者数据(从 broker 处获得)和队列数据(从 name server 处获得)进行逻辑判断,如果满足条件,则触发重平衡。

小结

  • 消费者变化和队列变化都会触发重平衡操作。
  • 触发方式包括 broker 的主动通知和 consumer 的轮询。

how

消费者是互相独立的线程或进程,彼此无法通信。

思考这个问题:如何保证不同消费者 Rebalance 的结果一致。

数据获取

Rebalance 需要当前消费者组下所有的消费者数据和订阅主题下所有的队列数据。

broker 通过心跳保存了所有的消费者数据,consumer 通过 RPC 传入消费者组可以获取到。

name server 保存了 topic 和队列的映射关系,也可以通过 RPC 请求获取。

分配策略

将获取到的队列数据按照队列号正向排列,将获取到的消费者按字符串大小正向排列。 这样在不同的消费者间,能够对齐成共同的初始数据。

RocketMQ 源码探究 -- 动态重平衡实现

不同的分配策略

RocketMQ 提供六种分配策略,也可以按照自己的需求进行定制。

分配策略中当前 consumer 的 client id 是关键参数,分配策略应尽可能与该 id 相关,这样减少不同 consumer Rebalance 过程中的冲突。

经过分配策略的执行,会得到当前 consumer 所分配到的队列。

结果比对

Consumer 将本次分配的队列与上次分配的队列进行比对,如果有新增的队列,需要启动新的 PullTaskImpl 线程,根据配置从指定 offset 处拉取最新的消息。

小结

  • Consumer 从 name server 和 broker 获取重平衡所需的原始数据
  • 不同的分配策略适用场景不同,但都要与当前 client id 相关。

对业务的影响

  • 在 Rebalance 过程中,会暂时停止消费者的消费,会造成短时间的业务停顿。
  • 由于消息的异步确认机制,在 Rebalance 过程后,可能会出现消息的重新投递现象,在批量拉取的场景下会更加明显。需要业务做好幂等操作。
  • 当消费者数量超过队列数时,超过部分不能参与消费,业务中要做好消费者和消息队列的匹配。

总结

本文介绍了 RocketMQ 中的动态重平衡机制,主要包括触发节点和重平衡实现流程两个核心机制。详细解析了重平衡中消费者和 broker 的交互逻辑、不同 consumer 是如何在没有中心节点的情况下保证重平衡结果一致,并介绍了动态重平衡可能对业务造成的影响。

  • 重平衡是为了匹配消费者和队列的能力而出现的。
  • 重平衡的基础是消费者组,一个消费者组可以有多个消费者,一个消费者可以消费多个队列,一个队列只能被一个消费者消费。
  • 重平衡会暂时中止消费进度。
转载自:https://juejin.cn/post/7374824876111167540
评论
请登录