likes
comments
collection
share

RocketMQ源码3-NameServer 消息处理

作者站长头像
站长
· 阅读数 15

欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

1. 处理业务请求的ChannelHandler:serverHandler

NamesrvController启动后,就可以处理Broker/Producer/Consumer的请求消息了,处理该类型消息的ChannelHandlerserverHandler,也就是NettyRemotingServer.NettyServerHandlerNettyServerHandlerNettyRemotingServer的内部类),代码如下:

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) 
            throws Exception {
        processMessageReceived(ctx, msg);
    }
}

继续跟进NettyRemotingAbstract#processMessageReceived方法:

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            // 请求消息
            case REQUEST_COMMAND:
                // todo
                processRequestCommand(ctx, cmd);
                break;
            // 响应消息
            case RESPONSE_COMMAND:
                // todo
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

这里我们直接跟进REQUEST_COMMAND命令的处理方法NettyRemotingAbstract#processRequestCommand

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    // 根据 code 从 processorTable 获取 Pair
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    // 找不到默认值
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    final RemotingResponseCallback callback = new RemotingResponseCallback() {
                        @Override
                        public void callback(RemotingCommand response) {
                            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                            // 不是 单向
                            if (!cmd.isOnewayRPC()) {
                                if (response != null) {
                                    response.setOpaque(opaque);
                                    response.markResponseType();
                                    try {
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        log.error("process request over, but response failed", e);
                                        log.error(cmd.toString());
                                        log.error(response.toString());
                                    }
                                } else {
                                }
                            }
                        }
                    };
                    // 异步netty请求处理器
                    if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                        AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                        processor.asyncProcessRequest(ctx, cmd, callback);
                    } else {
                        // 不是 异步请求处理器 从 pair 中拿到 Processor 进行处理
                        NettyRequestProcessor processor = pair.getObject1();
                        // todo 处理请求
                        RemotingCommand response = processor.processRequest(ctx, cmd);
                        callback.callback(response);
                    }
                } 
                ...
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
           ...
        }
    } else {
        ...
    }
}

这个方法主要流程为,先获取Pair对象,然后将处理操作封装为Runnable对象,接着把Runnable对象提交到线程池中。

这个Pair对象是啥呢?还记得我们在NamesrvController#initialize方法中创建的remotingExecutor吗,它最终注册到为NettyRemotingServerdefaultRequestProcessor属性:

@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
    this.defaultRequestProcessor 
            = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}

这里获取的Pair对象正是defaultRequestProcessorpair.getObject2()得到的线程池正是remotingExecutorpair.getObject1()得到的processorDefaultRequestProcessor.

这里我们就明白了,remotingExecutor线程池就是用来处理远程请求的。

远程命令的处理逻辑在Runnable#run方法中:

public void run() {
    try {
        doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
        ...
        // 处理请求
        if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
            AsyncNettyRequestProcessor processor = 
                    (AsyncNettyRequestProcessor)pair.getObject1();
            processor.asyncProcessRequest(ctx, cmd, callback);
        } else {
            NettyRequestProcessor processor = pair.getObject1();
            // 处理请求
            RemotingCommand response = processor.processRequest(ctx, cmd);
            callback.callback(response);
        }
    } catch (Throwable e) {
        ...
    }
}

代码中区分了同步与异步请求两种方式,实际上最终都会进入到DefaultRequestProcessor#processRequest方法中:

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {

    ...

    switch (request.getCode()) {
        ..
        // 操作kv 配置信息
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        // todo 查询broker版本信息
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
            // todo 注册broker
        case RequestCode.REGISTER_BROKER:
            // 获取版本
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            // 做版本兼容
            // 大于等于V3_0_11
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                // todo
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
            // todo 注销broker
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);

        // todo 根据topic获取路由信息
        case RequestCode.GET_ROUTEINFO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        // 省略其他消息的处理    
        ...
        default:
            break;
    }
    return null;
}

这个方法就是用来处理网络请求的,处理的请求消息会比较多,这里我们仅关注以下类型的消息:

  • 获取broker版本信息:请求codeQUERY_DATA_VERSION,用来查询broker的版本信息
  • broker注册:请求codeREGISTER_BROKERbroker启动时,会将自己的信息注册到nameServer
  • broker注销:请求codeUNREGISTER_BROKERbroker停止前,会发消息告诉nameServer自己将要关闭
  • 获取topic路由信息:根据 topic 获取路由信息,其实就是topic对应的brokermessageQueue信息

接下来我们就分析来介绍下这几种消息的实现。

2. broker的注册与注销消息

broker启动时,会向NamerServer发送注册消息;在broker关闭前,会向NameServer发送关闭消息。

我们先来看注册消息,处理broker注册消息的方法为DefaultRequestProcessor#registerBrokerWithFilterServer,我们直接看重要代码:

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, 
        RemotingCommand request) throws RemotingCommandException {
    ...

    // 处理注册
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        registerBrokerBody.getTopicConfigSerializeWrapper(),
        registerBrokerBody.getFilterServerList(),
        ctx.channel());
    ...
}

这里我们去除了不必要的代码,仅保留了注册的方法,这里调用的是RouteInfoManager#registerBroker方法,在分析这个方法前,我们先来了解下RouteInfoManager的基本信息,它的几个重要成员变量如下:

public class RouteInfoManager {
    ...
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // todo topic -> queuedata  topic消息队列的路由信息,消息发送时根据路由表进行负载均衡。
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // todo broker名称 -> broker信息  Broker基础信息,包含brokerName、所属集群名称、主备Broker地址
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // todo 集群名字 -> broker名称 Broker集群信息,存储集群中所有Broker的名称
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    // todo broker地址-> broker活跃信息  Broker状态信息,NameServer每次收到心跳包时会替换该信息。
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // todo broker地址 -> 一堆过滤器 Broker上的FilterServer列表,用于类模式消息过滤。
    //  类模式过滤机制在4.4及以后版本被废弃
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

前面提到过NameServer是一个非常简单的Topic路由注册中心,这个HashMap就是NameServer实现注册中心的关键!

  1. topicQueueTable:存放保存topicQueue的关系,value类型为List,表明一个topic可以有多个queueQueueData的成员变量如下:

    public class QueueData implements Comparable<QueueData> {
     // 所在的 borker 的名称   
     private String brokerName;
     // 读写数
     private int readQueueNums;
     private int writeQueueNums;
     private int perm;
     private int topicSynFlag;
     ...
    }
    
  2. brokerAddrTable:记录broker的具体信息,keybroker名称,valuebroker具体信息,BrokerData的成员变量如下:

    public class BrokerData implements Comparable<BrokerData> {
     // 所在集群的名称   
     private String cluster;
     // broker名称
     private String brokerName;
     // borkerId对应的服务器地址,一个brokerName可以有多个broker服务器
     private HashMap<Long, String> brokerAddrs;
    }
    
  3. clusterAddrTable:集群信息,保存集群名称对应的brokerName

  4. brokerLiveTable:存活的broker信息,keybroker地址,value为具体的broker服务器,BrokerLiveInfo的成员变量如下:

    class BrokerLiveInfo {
     // 上一次心跳更新时间
     private long lastUpdateTimestamp;
     private DataVersion dataVersion;
     // 表示网络连接的channel,由netty提供
     private Channel channel;
     // 高可用的服务地址
     private String haServerAddr;
     ...
    }
    

了解完成这些后,再回过头来看RouteInfoManager#registerBroker方法,我们就会发现所谓的注册就是往以上几个HashMapput数据的操作:

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            // 第一步 路由注册加写锁
            this.lock.writeLock().lockInterruptibly();

            // 判断broker所属集群是否存在
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            // 第二步,维护brokerData信息
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Long, String> item = it.next();
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                    it.remove();
                }
            }

            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            // 第三步 创建或更新topic路由元数据
            // broker是主节点
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                // topic配置信息发生变化  或 初次注册
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            // todo 创建或更新topic路由元数据,并填充topicQueueTable
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            // 第四步 更新brokerLiveTable
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }

            // 第5,注册broker的过滤器Server地址列表
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }

            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            // 释放锁
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

这样一来,这个方法所做的工作就一目了然了,就是把broker上报的信息包装下,然后放到这几个hashMap中。

了解完成注册操作后,注销操作就不难理解了,它是跟注册相反的操作,所做的事就是从这几个hashMap中移除broker对应的信息,处理方法为RouteInfoManager#unregisterBroker,代码中确实是进行hashMap移除的相关操作,这里就不分析了。

3. 根据topic获取路由信息

producerconsumer启动时,都需要根据topicNameServer获取对应的路由信息,处理消息的方法为DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    // 创建response
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    // 获取请求头
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    // todo 获取某个topic的路由信息
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    // topicRouteData不为null
    if (topicRouteData != null) {
        // 是否支持顺序消费 默认false
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }

        // 组装响应并返回
        // 序列化json
        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    // 如果不存在 返回topic不存在code
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

这个方法里调用了RouteInfoManager#pickupTopicRouteData方法,又是RouteInfoManager,这里我们就可以想象到,获取topic路由信息的操作,大致又是操作RouteInfoManager中的几个HashMap了:

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData.setBrokerDatas(brokerDataList);

    HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
    topicRouteData.setFilterServerTable(filterServerMap);

    try {
        try {
            // 读锁
            this.lock.readLock().lockInterruptibly();
            // 获取topic 获取queuedata 列表
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);
            // 如果不为null
            if (queueDataList != null) {
                topicRouteData.setQueueDatas(queueDataList);
                // 设置foundQueueData
                foundQueueData = true;

                // 获取queuedata 对应broker name
                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    brokerNameSet.add(qd.getBrokerName());
                }

                // 再根据broker name 取broker地址列表中 获取broker信息
                for (String brokerName : brokerNameSet) {
                    // 根据broker 名字 获取broker信息
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null != brokerData) {
                        BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                            .getBrokerAddrs().clone());
                        brokerDataList.add(brokerDataClone);
                        // 设置foundBrokerData
                        foundBrokerData = true;
                        // 处理filter 就是根据broker地址获取对应filter集合 也就是某个broker可能对应一堆filter集合
                        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                            List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                            filterServerMap.put(brokerAddr, filterServerList);
                        }
                    }
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    } catch (Exception e) {
        log.error("pickupTopicRouteData Exception", e);
    }

    log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);

    if (foundBrokerData && foundQueueData) {
        return topicRouteData;
    }

    return null;
}

从代码中来看,操作的是topicQueueTable,根据topictopicQueueTable中获取对应的queue数据后,剩下的无非就是对数据进行包装、过滤等,构造返回值,这里就不细讲了。

4. 查询broker版本信息

broker注册到nameServer前,会先发一个codeQUERY_DATA_VERSION的消息,判断版本号是否有变化再决定是否进行注册,处理该消息的代码为:

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {

    ...

    switch (request.getCode()) {
        ...
        // 查询dataVersion
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        ...
    
    ...
}

进入DefaultRequestProcessor#queryBrokerTopicConfig方法,代码如下:

public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
    final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
    final QueryDataVersionRequestHeader requestHeader =
        (QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);
    DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);

    // todo 关键代码:判断版本是否发生变化
    Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);
    if (!changed) {
        // 如果没改变,就更新最后一次的上报时间为当前时间
        this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());
    }

    DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);

    // 返回 nameServer当前的版本号
    if (nameSeverDataVersion != null) {
        response.setBody(nameSeverDataVersion.encode());
    }
    responseHeader.setChanged(changed);
    return response;
}

这个方法主要包含3个操作:

4.1 判断broker版本是否发生变化

这部分的判断,就是判断上报的版本号与NameServer保存的版本号是否一致:

public boolean isBrokerTopicConfigChanged(final String brokerAddr, 
        final DataVersion dataVersion) {
    // 继续查询
    DataVersion prev = queryBrokerTopicConfig(brokerAddr);
    return null == prev || !prev.equals(dataVersion);
}

这个方法就是判断逻辑了,只是一个简单的equals操作,继续看DataVersion的查询:

public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
    BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
    if (prev != null) {
        return prev.getDataVersion();
    }
    return null;
}

又是对RouteInfoManager那几个hashMap的操作,最终是从brokerLiveTable获取到了NameServer保存的版本号。

4.2 版本没有发生变化时的操作

如果版本没有发生变化,就更新当前时间为最新上报时间,这个流程没法啥好说的,直接上代码:

public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {
    BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
    if (prev != null) {
        prev.setLastUpdateTimestamp(System.currentTimeMillis());
    }
}

又是对RouteInfoManager那几个hashMap的操作,这里需要注意的是,当DataVersion没有发生变化,才会更新BrokerLiveInfo#lastUpdateTimestamp成员变量的值为当前时间。

那么当DataVersion发生变化时,就不会更新BrokerLiveInfo#lastUpdateTimestamp的值了吗?并不是,如果DataVersion发生了变化,就表明broker需要再次注册,BrokerLiveInfo#lastUpdateTimestamp会在注册请求里被改变了。

4.3 查询当前版本号

查询当前版本号,使用的方法是RouteInfoManager#queryBrokerTopicConfig,在上面的1. 判断broker版本是否发生变化中就用过了,这里就不再赘述了。

5. 定时任务:检测broker是否存活

在前面分析NamesrvController#initialize时,我们提到该方法启动了一个定时任务:

// 开启定时任务,每隔10s扫描一次broker,移除不活跃的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);

它调用的方法是RouteInfoManager#scanNotActiveBroker,代码如下:

public void scanNotActiveBroker() {
    // brokerLiveTable:存放活跃的broker,就是找出其中不活跃的,然后移除,操作的是 brokerLiveTable
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        // 上一次的心跳时间
        long last = next.getValue().getLastUpdateTimestamp();
        // 根据心跳时间判断是否存活,超时时间为2min
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            // 移除
            it.remove();
            // 处理channel的关闭,这个方法里会处理其他 hashMap 的移除
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

这个方法先是遍历brokerLiveTable,然后判断每个BrokerLiveInfo的最近一次的上报时间,判断是否超时,如果最近的上报时间距离当前超过了2分钟,说明该broker可能挂了,就将它从brokerLiveTable移除,然后调用RouteInfoManager#onChannelDestroy方法,移除其他hashMapbroker.

6. 总结

本文分析了NameServer对请求消息的处理,nameServer底层使用netty进行通讯,处理brokerproducerconsumer请求消息的ChannelHandlerNettyServerHandler,最终的处理方法为DefaultRequestProcessor#processRequest,这个方法会处理众多的请求,我们重点分析了注册/注销broker消息获取topic路由消息获取broker版本信息的处理流程。

注册/注销broker消息获取topic路由消息获取broker版本信息最终都是在RouteInfoManager类中处理,这个类中有几个非常重要的、类型为HashMap的成员变量如下:

  1. topicQueueTable:存放保存topicQueue的关系,value类型为List,表明一个topic可以有多个queue
  2. brokerAddrTable:记录broker的具体信息,keybroker名称,valuebroker具体信息
  3. clusterAddrTable:集群信息,保存集群名称对应的brokerName
  4. brokerLiveTable:存活的broker信息,keybroker地址,value为具体的broker服务器

这个几成员变量就是NameServer被称为注册中心的原因所在,所谓的注册/注销broker,就是往这几个hashMapputremove相关的broker信息;获取topic路由消息就是从topicQueueTable中获取broker/messageQueue等信息。

nameServer所谓的"注册"、“发现”、“心跳”等,都是对RouteInfoManager这几个hashMap成员变量进行操作的。

好了,本文就到这里了,下篇开始我们将进入product的分析

参考文章

RocketMQ4.8注释github地址 RockeMQ源码分析 RocketMQ源码专栏