likes
comments
collection
share

RocketMQ源码4-producer 启动流程(获取topic路由信息)

作者站长头像
站长
· 阅读数 18

欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

本文我们来分析rocketMq producer 发送消息的流程.

producer发送消息的示例在org.apache.rocketmq.example.simple.Producer类中,代码如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1. 创建 DefaultMQProducer 对象
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

       
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        // todo 2. 启动 producer
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

              
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );


                // 3. 发送消息
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } 
            ...
        }
        producer.shutdown();
    }
}

以上代码分三步走:

  1. 创建 DefaultMQProducer 对象
  2. 启动 producer
  3. 发送消息

接下来我们的分析也按这三步进行。

1. DefaultMQProducer构造方法

我们看下DefaultMQProducer 这个类有哪些功能:

  • 功能一:给消息生产者配置参数,调整参数就是调用这个类的api,比如上面我们设置nameserv地址producer.setNamesrvAddr("127.0.0.1:9876");,可以把它看作一个配置类,
  • 功能二:发送消息的功能,这里它发送消息都是调用defaultMQProducerImpl 这个类,
  • 功能三:它实现MQAdmin 接口里面关于topic与MessageQueue的操作。

我们来看下它里面有哪些重要的参数:

字段默认值解释
producerGroupnull组信息,需要你自己指定
defaultTopicQueueNums4就是新建一个topic 默认设置4个MessageQueue
sendMsgTimeout3000发送消息的超时时间 单位ms
compressMsgBodyOverHowmuch1024 * 4这个就是当发送的消息内容大于这个数的时候 进行压缩,压缩阈值,默认是4k
maxMessageSize1024 * 1024 * 4允许发送消息最大大小 默认是4m
retryTimesWhenSendFailed2发送失败的时候重试次数 默认是2次
retryTimesWhenSendAsyncFailed2异步发送失败的时候重试次数 默认是2次
retryAnotherBrokerWhenNotStoreOKfalse指示是否在内部发送失败时重试另一个broker

DefaultMQProducer这个类还继承了一个ClientConfig ,这ClientConfig 不用看就知道是个客户端配置类,我们看看它里面重要字段的解释:

字段默认值解释
namesrvAddr默认去jvm启动参数 rocketmq.namesrv.addr ,系统环境变量 NAMESRV_ADDR 中找nameserv的地址,这个最好是自己设置进去,你在DefaultMQProducer 类set 的就是赋值给了它
clientIP自己去找本客户端地址,他自己就会去找的
instanceName默认是去jvm启动参数rocketmq.client.name 中找,没有设置DEFAULT,这个它会自己重新设置的实例名称
clientCallbackExecutorThreadscpu核心数执行callback 线程池的线程核心数
pollNameServerInterval1000 * 30多久去nameserv 获取topic 信息,默认是30s
heartbeatBrokerInterval1000 * 30与broker心跳间隔时间,默认是30s,就是每隔30向broker发送心跳

DefaultMQProducer还实现一个MQProducer 接口,不用看,这个MQProducer 接口就是一堆send方法与start,shutdown方法,然后让DefaultMQProducer去实现。DefaultMQProducer构造方法代码如下:

public DefaultMQProducer(final String producerGroup) {
    // 继续调用
    this(null, producerGroup, null);
}


/**
 * 最终调用的构造方法
 */
public DefaultMQProducer(final String namespace, 
        final String producerGroup, RPCHook rpcHook) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

这个方法就是简单地赋了值,然后创建了DefaultMQProducerImpl实例,我们继续看DefaultMQProducerImpl的构造方法:

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;

    // 5w大小的链表队列,异步发送线程池队列
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    // 默认的异步发送线程池
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),// 核心线程池数,cpu核数
        Runtime.getRuntime().availableProcessors(), // 最大线程池数,cpu核数
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.asyncSenderThreadPoolQueue,
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
}

这个构造方法依然还是处理赋值操作,并没做什么实质性内容,就不继续深究了。

2. DefaultMQProducer#start:启动producer

接着我们来看看producer的启动流程,进入DefaultMQProducer#start方法:

public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    // 调用 defaultMQProducerImpl 的 start() 方法
    this.defaultMQProducerImpl.start();
    // 消息轨迹相关,我们不关注
    if (null != traceDispatcher) {
        ...
    }
}

这个方法先是调用了defaultMQProducerImpl#start方法,然后处理消息轨迹相关操作,关于rocketMq消息轨迹相关内容,本文就不过多探讨了,我们将目光聚集于DefaultMQProducerImpl#start(boolean)方法:

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        // 刚创建还没有启动
        case CREATE_JUST:
            // 设置为启动失败状态
            this.serviceState = ServiceState.START_FAILED;

            // 检查group配置
            this.checkConfig();

            // 只要group组 不是CLIENT_INNER_PRODUCER, 就重新设置下实例名称
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                // 改变生产者的instanceName 为进程id
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            // todo 创建MQClientInstance 实例(封装了网络处理API,消息生产者、消费者和Namesrv、broker打交道的网络通道)
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            // todo 进行注册
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                // 没有注册成功,设置状态为创建没有启动,然后抛出之前已经注册的异常
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            // topic --> topic info
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            // 是否启动 client实例,默认是true
            if (startFactory) {
                // todo 核心
                mQClientFactory.start();
            }

            // 启动生产者成功
            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            // 设置状态running
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    // todo 发送心跳到所有broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    // 定时扫描异步请求的返回结果
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

这个方法并不复杂相关内容都已经作了注释,这里重点提出3个方法:

  1. mQClientFactory.start():执行方法为MQClientInstance#start,这个方法里会启动一些组件,我们稍后会分析。

  2. mQClientFactory.sendHeartbeatToAllBrokerWithLock():发送心跳到所有的broker,最终执行的方法为MQClientAPIImpl#sendHearbeat

    public int sendHearbeat(
        final String addr,
        final HeartbeatData heartbeatData,
        final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        // request 的 code 为 HEART_BEAT
        RemotingCommand request = RemotingCommand
            .createRequestCommand(RequestCode.HEART_BEAT, null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        // 异步调用
        RemotingCommand response = this.remotingClient
            .invokeSync(addr, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return response.getVersion();
            }
            default:
                break;
        }
    
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }
    

    这里是与broker通信,requestcodeHEART_BEAT,后面的分析中我们会看到,producer也会同nameServer通信。

  3. 定时扫描异步请求的返回结果:最终执行的方法为RequestFutureTable.scanExpiredRequest(),关于该方法的内容,我们在分析producer发送异步消息时再分析。

2.1 MQClientInstance#start:启动MQClientInstance

接下来我们来看看MQClientInstance的启动,方法为MQClientInstance#start,代码如下:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                // 先设置 失败
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                // 判断namesrv 是否为null
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // todo Start request-response channel
                // 启动远程服务,这个方法只是装配了netty客户端相关配置
                // todo 注意:1. 这里是netty客户端,2. 这里并没有创建连接
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                // todo 开启任务调度
                this.startScheduledTask();
                // Start pull service
                // todo 开启 拉取服务
                this.pullMessageService.start();
                // Start rebalance service
                // todo 开启平衡服务
                this.rebalanceService.start();
                // Start push service
                // todo 启用内部的 producer
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                // 设置状态为running
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

这个方法进行的操作在注释中已经说明得很清楚了,接下来我们对以上的部分操作做进一步分析。

2.1.1 mQClientAPIImpl.start():配置netty客户端

在看MQClientAPIImpl 的启动方法之前,我们需要看下它的构造:

public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
    final ClientRemotingProcessor clientRemotingProcessor,
    RPCHook rpcHook, final ClientConfig clientConfig) {
    // 客户端配置
    this.clientConfig = clientConfig;
    topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
    // todo netty client
    this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
    this.clientRemotingProcessor = clientRemotingProcessor;

    // 注册rpc hook
    this.remotingClient.registerRPCHook(rpcHook);
    // 注册 processor CHECK_TRANSACTION_STATE 检查事务状态
    this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);

    // 通知消费者id已更改
    this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);

    // 重置消费者客户端偏移量
    this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);

    // 从客户端拉取消费者状态
    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);

    // 获取消费者运行状态
    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);

    // 消费信息
    this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);

    this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}

clientConfig这个是关于client配置的一些信息,然后往后创建了一个NettyRemotingClient对象,这个就是netty client,封装了一些同步异步的发送,接着clientRemotingProcessor,这个就是netty收到消息时候处理类,然后下面这一堆注册processor这个就不看了,其实就是当收到这些code的消息时候让哪个processor来处理。

先来看下clientConfig 有哪些配置字段需要我们关注:

字段默认值解释调优
clientWorkerThreads4这个其实就是处理netty 那堆自定义handler的线程
clientCallbackExecutorThreadscpu核心线程数callback线程数,这个用来执行你注册的那些processor
clientOnewaySemaphoreValue默认是65535当发送单向消息的时候,信号量限流
clientAsyncSemaphoreValue65535当发送异步消息的时候,信号量限流
connectTimeoutMillis3000连接超时时间没啥现实意义
clientSocketSndBufSize65535netty 客户端 发送buffer
clientSocketRcvBufSize65535netty 客户端 接收buffer
clientChannelMaxIdleTimeSeconds120s这个就是netty channel 最大空闲时间也能调优

看下NettyRemotingClient 的构造方法:

public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
    final ChannelEventListener channelEventListener) {
    // 设置 异步与单向请求的信号量
    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
    // netty配置文件
    this.nettyClientConfig = nettyClientConfig;
    // listener
    this.channelEventListener = channelEventListener;

    // 这个是netty 客户端回调线程数,默认是cpu核数
    int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
    // 如果小于等于0 默认为4
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }

    // 设置线程池
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    // selector 线程为1
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });

    // 是否启用tls
    if (nettyClientConfig.isUseTLS()) {
        try {
            sslContext = TlsHelper.buildSslContext(true);
            log.info("SSL enabled for client");
        } catch (IOException e) {
            log.error("Failed to create SSLContext", e);
        } catch (CertificateException e) {
            log.error("Failed to create SSLContext", e);
            throw new RuntimeException("Failed to create SSLContext", e);
        }
    }
}

首先是调用父类的构造,创建2个信号量Semaphore,这两个分别用来对单向发送,异步发送进行限流,默认是65535。之后就是创建public线程池,这个线程池主要是用来处理你注册那堆processor,默认线程数是cpu核心数,再往后就是创建netty niogroup组就1个线程,这个主要是用来处理连接的,最后就是判断是否使用ssl,使用的话创建ssl context。

再来看NettyRemotingClient#start方法,代码如下:

@Override
public void start() {
    // 默认 4个线程
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true) // 不使用tcp中的DELAY算法,就是有小包也要发送出去
        .option(ChannelOption.SO_KEEPALIVE, false) // keeplive false
            // 链接超时 默认3s
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            //发送buffer 默认是65535
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            // 接收buffer 默认是65535
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // 是否启用tls
                if (nettyClientConfig.isUseTLS()) {
                    if (null != sslContext) {
                        pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                        log.info("Prepend SSL handler");
                    } else {
                        log.warn("Connections are insecure as SSLContext is null!");
                    }
                }
                pipeline.addLast(
                        // 使用这个线程组处理下面这写processor 默认是4个线程
                    defaultEventExecutorGroup,
                    new NettyEncoder(), // 编码
                    new NettyDecoder(), // 解码
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    new NettyConnectManageHandler(), // 连接管理
                    new NettyClientHandler());  // todo netty client handler 处理接收到的消息
            }
        });

    // todo 扫描消息获取结果,每秒执行1次
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                // todo 扫描响应表
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}

对于这个方法,说明有两个点:

  1. 方法里使用的是Bootstrap而非ServerBootstrap,表示这是netty客户端
  2. 整个方法中并没有创建连接

2.1.2 startScheduledTask():启动定时任务

启动定时任务的方法为MQClientInstance#startScheduledTask,代码如下:

private void startScheduledTask() {
    if (null == this.clientConfig.getNamesrvAddr()) {
        // todo 获取namesrv地址, 2分钟执行一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 从namesrv上面更新topic的路由信息,默认30s
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 清除下线broker
                MQClientInstance.this.cleanOfflineBroker();
                // todo 发送心跳到所有broker上面
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 持久化consumer offset 可以放在本地文件,也可以推送到 broker
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 调整线程池的线程数量,并没有用上
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}

这里共有5个定时任务:

  1. 定时获取 nameServer 的地址,MQClientInstance#start一开始会调用MQClientAPIImpl#fetchNameServerAddr获取nameServer,这里也调用了这个方法
  2. 定时更新topic的路由信息,这里会去nameServer获取路由信息,之后再分析
  3. 定时发送心跳信息到nameServer,在DefaultMQProducerImpl#start(boolean)中,我们也提到了向nameServer发送心跳信息,两处调用的是同一个方法
  4. 持久化消费者的消费偏移量,这个仅对消费者consumer有效,后面分析消费者时再作分析
  5. 调整线程池的线程数量,不过追踪到最后,发现这个并没有生效,就不多说了

定时更新topic的路由信息

这个就是它的定时任务,默认是30s执行一次的。我们看下updateTopicRouteInfoFromNameServer():

public void updateTopicRouteInfoFromNameServer() {
    // 从消费者端与生产者端 获取topic集合
    Set<String> topicList = new HashSet<String>();

    // Consumer
    {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                Set<SubscriptionData> subList = impl.subscriptions();
                if (subList != null) {
                    for (SubscriptionData subData : subList) {
                        topicList.add(subData.getTopic());
                    }
                }
            }
        }
    }

    // Producer
    {
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                Set<String> lst = impl.getPublishTopicList();
                topicList.addAll(lst);
            }
        }
    }

    // 遍历topic集合,进行更新路由信息
    for (String topic : topicList) {
        // todo
        this.updateTopicRouteInfoFromNameServer(topic);
    }
}

它是将consumerproducer 所有topic 放到一个topic集合中,然后遍历这个集合,一个一个topic请求更新。

接下来我们来看下这个updateTopicRouteInfoFromNameServer 方法,这方法有点长,我们一部分一部分的介绍,大体上分为2个部分吧,第一个部分是获取对应topic的信息,然后第二部分就是更新本地的topic table 缓存

获取部分代码:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                // 默认 并且 defaultMQProducer不为空,当topic不存在的时候会进入这个if中
                if (isDefault && defaultMQProducer != null) {
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                        1000 * 3);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    // todo 获取topicRoute信息
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                ...
}

if是true这段是你没有topic 然后需要创建topic 的时候干的事,它会向nameserv获取topicTBW102的值,然后获取它对应的那个topicRouteData

fasle的时候进入else里面,这个就是拿着topic去nameserv那获取对应的topicRouteData。 这里需要解释下这个TopicRouteData,可以理解为里面存了两部分内容,一是broker 地址信息,二是messagequeue 对应哪个broker上面。稍微看下:

/**
 * 包含两部分内容,一是broker 地址信息,二是messagequeue 对应哪个broker上面
 */
public class TopicRouteData extends RemotingSerializable {
    // 顺序消息配置内容,来自kvConfig
    private String orderTopicConf;
    // topic队列元数据
    private List<QueueData> queueDatas;
    // topic分布的broker元数据
    private List<BrokerData> brokerDatas;
    // broker上过滤服务器的地址列表
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

接着我们追踪下从nameserv获取topic信息的代码

/**
 * 从namesrv获取topic路由信息
 * @param topic topic名称
 * @param timeoutMillis 超时时间,默认3s
 * @param allowTopicNotExist 允许这个topic不存在
 */
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    // 获取路由信息的请求头
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);

    // 创建RemotingCommand 创建请求实体
    // 发送请求的 code 为 GET_ROUTEINFO_BY_TOPIC
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

    // todo 进行同步调用,获取结果
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        // topic不存在
        case ResponseCode.TOPIC_NOT_EXIST: {
            // 允许topic不存在
            if (allowTopicNotExist) {
                log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
            }

            break;
        }
        // 成功
        case ResponseCode.SUCCESS: {
            // 获取内容
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        default:
            break;
    }

    // 抛异常
    throw new MQClientException(response.getCode(), response.getRemark());
}

我们可以看到,先是封装了一个获取路由的请求头,然后将topic 设置到请求头里面,然后又将请求头封装成RemotingCommand,其实不管是requestHeader还是RemotingCommand,其实都是存储信息的一些实体,只是代表的含义不一样而已,你可以把RemotingCommand 看作是http协议的格式,想想http有请求行,请求头,请求体,然后他这个requestHeader算是个请求头,然后RemotingCommand里面还有body属性可以看作是请求体,需要注意的是,它设置了一个RequestCodeGET_ROUTEINTO_BY_TOPIC,其实nameserv就是根据这个请求code判断出来你要干什么,只有你把GET_ROUTEINTO_BY_TOPIC 告诉nameserv的时候,nameserv才能知道你要从我这里获取这个topic的路由信息。

这里发送向NameServer发送消息的codeGET_ROUTEINFO_BY_TOPIC,这点在前面分析nameServer的消息处理时也分析过了,并且还分析了当消息送达nameServer后,nameServer是如何返回topic数据的,遗忘的小伙伴可以看下之前分析nameServer的文章。RocketMQ源码3-NameServer 消息处理 第3节

接着就是调用remotingClient发送消息了,它这里用的是同步发送的方式(这里暂时先不说了,其实就是使用netty client 发送消息, 后面有提到),也就是阻塞等着响应过来,超时时间默认是3s,再往下看如果这个响应code是success的话,就把body弄出来然后反序列化话成 TopicRouteData对象。

其实这里有个问题,就是我们有多个nameserv ,比如说我们有3台nameserv ,那么生产者是找谁获取的呢?其实它是这个样子的,你上次用的那个nameserv要是还ok的话,也就是连接没断的话,它会继续使用你上次用的个,如果你是第一次发起这种请求,没有与nameserv创建过连接或者是上次创建的那个连接不好使了,这个时候就会有个计数器,轮询的使用 ,也就是计数器值+1%namserv地址个数的形式,如果不理解的同学可以找个轮询算法看下,其实都是使用计数器+1 % 列表的个数,这样能够选出来一个列表的位置来,再根据这个位置去列表中获取一下具体的值,好了这个轮询算法我们先解释这么多 ,如果能够正常创建连接,直接使用这个连接了就,如果不能使用,也就是创建连接失败,访问不通等等,这时候继续循环使用这个轮询算法获取下一个地址,然后创建连接,如果能够创建成功,返回对应的channel就可以了,然后client可以往这个channel上发送请求了,这个channel的话可以看作两端的通道,管道都可以。如果不成功继续循环,他这里循环次数是你配置nameserv地址的个数。

@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    // 开始时间
    long beginStartTime = System.currentTimeMillis();
    // todo 轮询获取namesrv地址Channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 执行开始之前的rpchook
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            // 判断超时 之前有获取链接的操作,可能会出现超时的情况
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // todo 进行同步执行,获取响应
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            // 执行之后的rpchook
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
            // 远程发送请求异常
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            // 关闭channel
            this.closeChannel(addr, channel);
            throw e;
            // 超时异常
        } catch (RemotingTimeoutException e) {
            // 如果超时 就关闭cahnnel话,就关闭channel 默认是不关闭的
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

好了,这里我们就把获取路由信息的部分看完了,接着就是解析这个TopicRouteData 然后放到生产者本地缓存起来了。

更新本地路由缓存

这块内容也是比较长,我们一段一段看下:

if (topicRouteData != null) {
    // 之前的
    TopicRouteData old = this.topicRouteTable.get(topic);
    // 对比,看是否发生变化
    boolean changed = topicRouteDataIsChange(old, topicRouteData);
    // 没有发生变化
    if (!changed) {
        // 调用isNeedUpdateTopicRouteInfo 再次判断是否需要更新
        changed = this.isNeedUpdateTopicRouteInfo(topic);
    } else {
        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    }

他这里先是从 topic table中获取对应topic 的老数据,然后 拿老的 与新请求的进行对比,判断一下有没有变动,如果没有变动的话,就调用isNeedUpdateTopicRouteInfo 方法再判断一下需要更新,这个方法其实就是遍历所有的producer 或者是consumer,然后看看他们的topic table里面是不是都有这个topic 没有的话就需要更新下。

if (changed) {
    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

    // 更新broker地址
    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    }

    // Update Pub info
    // 更新推送消息/就是更新生产者topic信息
    {
        // todo 将topic 转成topic publish
        TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
        publishInfo.setHaveTopicRouterInfo(true);
        // 调用所有的producer 更新对应的topic info
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                // 更新 topic publishinfo
                impl.updateTopicPublishInfo(topic, publishInfo);
            }
        }
    }

    // Update sub info
    // 更新订阅信息 更新消费者topic信息
    {
        Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicSubscribeInfo(topic, subscribeInfo);
            }
        }
    }
    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    // 添加到route表中
    this.topicRouteTable.put(topic, cloneTopicRouteData);
    return true;
}

这里首先是更新了一下brokerAddrTable这个map ,这map里面然后就是缓存着broker name-> broker地址的集合

接着就是将 topicRouteData转成topicPublishInfo ,然后 haveTopicRouterInfo设置成true,就是说明它这个topicPublishInfo 里面存着对应的topicRouteData信息。

// 将topic 路由信息转成topic publish 信息 提供给消息发送者发送消息使用
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
    TopicPublishInfo info = new TopicPublishInfo();
    info.setTopicRouteData(route);
    if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
        String[] brokers = route.getOrderTopicConf().split(";");
        for (String broker : brokers) {
            String[] item = broker.split(":");
            int nums = Integer.parseInt(item[1]);
            for (int i = 0; i < nums; i++) {
                MessageQueue mq = new MessageQueue(topic, item[0], i);
                info.getMessageQueueList().add(mq);
            }
        }

        info.setOrderTopic(true);
    } else {
        List<QueueData> qds = route.getQueueDatas();
        Collections.sort(qds);
        for (QueueData qd : qds) {
            if (PermName.isWriteable(qd.getPerm())) {
                BrokerData brokerData = null;
                for (BrokerData bd : route.getBrokerDatas()) {
                    if (bd.getBrokerName().equals(qd.getBrokerName())) {
                        brokerData = bd;
                        break;
                    }
                }

                if (null == brokerData) {
                    continue;
                }

                // 判断有没有master
                if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                    continue;
                }
                // 创建对应的 messageQueue
                for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    info.getMessageQueueList().add(mq);
                }
            }
        }

        info.setOrderTopic(false);
    }

    return info;
}

这里就是将返回的topicRouteData 转成对应的topicPublishInfo,这个topicPublishInfo其实里面就是MessageQueue,比如说我topicRouteData 里面返回2个broker ,然后每个broker的writeQueueNums 个数是4个,这个时候它生成的MessageQueue就是8个,然后每个broker对应着4个MessageQueue

接着就是遍历更新各个producertopicPublishInfoTable 对应topic信息。 好了,到这我们的更新topic信息的解析就结束了。

2.2 创建topic

有这么一个场景: 我发送某个消息的时候指定的那个topic不存在(就是之前没有创建过)消息生产者是怎样处理的,默认的话如果topic不存在的话,消息生产者会先去nameserv拉下topic信息(从nameserv获取topic信息流程),要是还不存在的话,

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    /**
     * 第一次发送消息时,本地没有缓存topic的路由信息,查询
     * NameServer尝试获取路由信息,如果路由信息未找到,再次尝试用默
     * 认主题DefaultMQProducerImpl#createTopicKey去查询
     */
    /**
     * 生产环境,不建议开启自动创建主题
     * 原因如:https://mp.weixin.qq.com/s/GbSlS3hi8IE0kznTynV4ZQ
     */
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 首先,使用topic 从NameServer尝试获取路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // isDefault 为true 其次,使用默认的topic从NameServer尝试获取路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

这里就走else的代码了,其实还是调用的updateTopicRouteInfoFromNameServer重载方法,这里这个isDefaulttrue了。这个时候就获取一下默认topic的路由信息,这个默认topic是TBW102,发送消息就选择TBW102这个topic的broker发送,broker收到消息后会自动创建这个topic,这里需要注意的是broker得支持自动创建,这里是有个参数配置的autoCreateTopicEnable 设置成true就可以了。

topic我们一般是不会在producer中自动创建的,一般使用RocketMQ的可视化控制台,然后创建topic,指定对应的queue num,指定broker等等,类似下面这个东西 RocketMQ源码4-producer 启动流程(获取topic路由信息)

参考文章

RocketMQ4.8注释github地址 RockeMQ源码分析 RocketMQ源码专栏