likes
comments
collection
share

RocketMQ的NameServer的架构设计及其启动流程源码分析

作者站长头像
站长
· 阅读数 39

第二篇文章出炉了

RocketMQ系列文章:

一、NameServer的架构设计

Broker启动的时候会向所有的NameServer注册,生产者在发送消息时会先从NameServer中获取Broker消息服务器的地址列表,根据负载均衡算法选取一台Broker消息服务器发送消息。NameServer与每台Broker之间保持着长连接,并且每隔10秒会检查Broker是否存活,如果检测到Broker超过120秒未发送心跳,则从路由注册表中将该Broker移除。

但是路由的变化不会马上通知消息生产者,这是为了降低NameServe的复杂性,所以在RocketMQ中需要消息的发送端提供容错机制来保证消息发送的高可用性,这在后续关于RocketMQ消息发送的章节会介绍。

RocketMQ的NameServer的架构设计及其启动流程源码分析

二、从源码来看NameServer的启动流程

2.1 源码获取

Github上的仓库地址:github.com/apache/rock… ,如果访问速度较慢的去Gitee上获取源码,这里也把Gitee的地址贴上:gitee.com/apache/rock…

这里我使用的RocketMQ版本是4.9.4。

2.2 Window上使用IDEA运行RocketMQ

因为是在学习阶段,所以把RokcetMQ的代码拉到本地,这里我使用IDEA运行调试代码,在学习的时候也可以加上自己的注释。

1、把RocketMQ的代码拉到本地之后,切换分支到自己想要用的分支即可,这里我是切换到release-4.9.4

2、使用IDEA打开rocketmq,等IDEA构建完成之后,目录如下。

RocketMQ的NameServer的架构设计及其启动流程源码分析

3、图里的ROCKETMQ文件夹是我创建,主要有conf文件夹,并且将distribution目录下的conf目录下的内容复制过来。

修改broker.conf的内容:其中一些路径自行修改

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 存储路径E:\java\source\rocketmq\ROCKETMQ
storePathRootDir=E:\\java\\source\\rocketmq\\ROCKETMQ\\store
# CommitLog存储路径
storePathCommitLog=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\commitlog
# 消费队列存储路径
storePathConsumeQueue=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\index
# checkpoint文件存储路径
storeCheckpoint=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\checkpoint
# abort文件存储路径
abortFile=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\abort

还有三个logback日志文件配置:logback_broker.xml、logback_namesrv.xml、logback_tools.xml。这里我主要的修改是将${USER_HOME}改成${ROCKETMQ_HOME}。在Window下这些日志会被存放到C盘,这里我通过在启动Broker和NameServer时指定一个环境变量:ROCKETMQ_HOME,也就是我创建的ROCKETMQ文件夹,主要是学习调试的时候方便查看日志。

	<appender name="DefaultAppender"
              class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${ROCKETMQ_HOME}/logs/rocketmqlogs/${brokerLogDir}/broker_default.log</file>
        <append>true</append>
        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
            <fileNamePattern>${ROCKETMQ_HOME}/logs/rocketmqlogs/otherdays/${brokerLogDir}/broker_default.%i.log.gz</fileNamePattern>
            <minIndex>1</minIndex>
            <maxIndex>10</maxIndex>
        </rollingPolicy>
        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <maxFileSize>100MB</maxFileSize>
        </triggeringPolicy>
        <encoder>
            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
            <charset class="java.nio.charset.Charset">UTF-8</charset>
        </encoder>
    </appender>

IDEA中启动Broker和NameServer时执行的环境变量如下图所示:

RocketMQ的NameServer的架构设计及其启动流程源码分析

Broker启动时候指定配置文件:

RocketMQ的NameServer的架构设计及其启动流程源码分析

4、当NameServer和Broker启动之后,便可去example文件的quickstart包运行Consumer和Producer测试了。

2.2 启动流程

NameServer的启动类是org.apache.rocketmq.namesrv.NamesrvStartup#main0(String args)。如下:

    public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            // 创建NamesrvController
            NamesrvController controller = createNamesrvController(args);
            // 启动
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

第一步

createNamesrvController方案接收参数解析配置文件,配置NamesrvConfig和NettyServerConfig的属性。

对于配置的输入有两种:1、通过 -c 配置文件的路径。2、使用 --属性名 属性值命令输入,例如:--listenPort 9876

如下:

    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }
	// 创建NamesrvConfig
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        // 创建NettyServerConfig
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        // -c 指定配置文件
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        // ....省略

        return controller;
    }

在创建NamesrvConfig时属性读取:

  • rocketmqHome:RocketMQ的主目录,可以通过启动参数-Drocketmq.home.dir=路径或者设置环境变量ROCKETMQ_HOME来配置RocketMQ的主目录。
  • kvConfigPath:NameServer存储KV配置属性的持久化路径
  • configStorePath:NameServer默认配置文件路径。NameServer启动时如果需要通过配置文件配置NameServer启动属性使用-c指定。
  • orderMessageEnable:是否支持顺序消息,默认不支持。

如下:

public class NamesrvConfig {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    private boolean orderMessageEnable = false;

    // ...setter getter
}

在创建NettyServerConfig时属性读取:

如下:

public class NettyServerConfig implements Cloneable {
    private int listenPort = 8888;
    private int serverWorkerThreads = 8;
    private int serverCallbackExecutorThreads = 0;
    private int serverSelectorThreads = 3;
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    private int serverChannelMaxIdleTimeSeconds = 120;

    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
    private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
    private int serverSocketBacklog = NettySystemConfig.socketBacklog;
    private boolean serverPooledByteBufAllocatorEnable = true;

    // ....setter getter
}
  • listenPort:NameServer监听端口,该值默认会被初始化为9876,在createNamesrvController方法是会重新指定端口。
  • serverWorkerThreads:Netty业务线程池线程个数。
  • serverCallbackExecutorThreads:Netty public任务线程池线程个数。Netty网络会根据业务类型创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由public线程池执行。
  • serverSelectorThreads:I/O线程池线程个数,主要是NameServer、Broker端解析请求、返回相应的线程个数。这类线程主要用于处理网络请求,先解析请求包,然后转发到各个业务线程池完成具体的业务操作,最后将结果返回给调用方。
  • serverOnewaySemaphoreValue:send oneway消息请求的并发度(Broker端参数)。
  • serverAsyncSemaphoreValue:异步消息发送的最大并发度(Broker端参数)。
  • serverChannelMaxIdleTimeSeconds:网络连接最大空闲时间,默认为120s。如果连接空闲时间超过该参数设置的值,连接将被关闭。
  • serverSocketSndBufSize:网络socket发送缓存区大小,默认为64KB。
  • servrSocketRcvBufSize:网络socket接收缓存区大小,默认为64KB。
  • serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存,建议开启。
  • useEpollNativeSelector:是否启用Epoll I/O模型,Linux环境下建议开启。

这些属性,可以通过启动参数:--属性名 属性值来指定。

第二步

根据启动属性创建NamesrvController实列并初始化,NamesrvController实例时NameServer的核心控制器。org.apache.rocketmq.namesrv.NamesrvStartup#main0调用org.apache.rocketmq.namesrv.NamesrvStartup#start,如下:

public static NamesrvController start(final NamesrvController controller) throws Exception {
    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }
    // 初始化 NamesrvController
    boolean initResult = controller.initialize();
    // 初始化失败,退出
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }
    // 注册JVM关闭钩子
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
        controller.shutdown();
        return null;
    }));
    // 启动 NamesrvController
    controller.start();
    return controller;
}

关于注册JVM关闭钩子,这是很常见的用法,确保在关闭JVM的时候,先将线程池关闭,释放资源。

这里我们看一下org.apache.rocketmq.namesrv.NamesrvController#initialize方法:

public boolean initialize() {
	// 加载KV配置
        this.kvConfigManager.load();
	// 创建NettyServer
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
		
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();
        // 每10秒扫描brokerLiveTable
        this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
	// 每10分钟打印KV配置
        this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);

        // ... 省略tls相关代码

        return true;
    }

这里会启动两个定时任务:

  • NameServer每隔10秒扫描一次brokerLiveTable,移除处于未激活状态的Broker。org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker
    public int scanNotActiveBroker() {
        int removeCount = 0;
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            // 如果收到broker上一次心跳包的时间小于120秒,则移除该broker的信息
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                // 关闭连接
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());

                removeCount++;
            }
        }

        return removeCount;
    }
  • 另外一个定时任务是,每隔10分钟打印一次KV配置。

三、小结

关于NameServer的启动流程先简单的介绍到这里,从源码上了解到了关于NameServer的一些启动参数及其启动流程。在最后的scanNotActiveBroker方法,有一个类RouteInfoManager其功能是NameServer的路由信息管理类,下一篇文章将慢慢地了解其原理,以及在本文所说的路由信息到底又有什么内容。