RocketMQ源码3-NameServer 消息处理
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
1. 处理业务请求的ChannelHandler
:serverHandler
NamesrvController
启动后,就可以处理Broker/Producer/Consumer
的请求消息了,处理该类型消息的ChannelHandler
为serverHandler
,也就是NettyRemotingServer.NettyServerHandler
(NettyServerHandler
是NettyRemotingServer
的内部类),代码如下:
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg)
throws Exception {
processMessageReceived(ctx, msg);
}
}
继续跟进NettyRemotingAbstract#processMessageReceived
方法:
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
// 请求消息
case REQUEST_COMMAND:
// todo
processRequestCommand(ctx, cmd);
break;
// 响应消息
case RESPONSE_COMMAND:
// todo
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
这里我们直接跟进REQUEST_COMMAND
命令的处理方法NettyRemotingAbstract#processRequestCommand
:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 根据 code 从 processorTable 获取 Pair
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 找不到默认值
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
// 不是 单向
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
// 异步netty请求处理器
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
// 不是 异步请求处理器 从 pair 中拿到 Processor 进行处理
NettyRequestProcessor processor = pair.getObject1();
// todo 处理请求
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
}
...
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
...
}
} else {
...
}
}
这个方法主要流程为,先获取Pair
对象,然后将处理操作封装为Runnable
对象,接着把Runnable
对象提交到线程池中。
这个Pair
对象是啥呢?还记得我们在NamesrvController#initialize
方法中创建的remotingExecutor
吗,它最终注册到为NettyRemotingServer
的defaultRequestProcessor
属性:
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor
= new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
这里获取的Pair
对象正是defaultRequestProcessor
,pair.getObject2()
得到的线程池正是remotingExecutor
,pair.getObject1()
得到的processor
是DefaultRequestProcessor
.
这里我们就明白了,remotingExecutor
线程池就是用来处理远程请求的。
远程命令的处理逻辑在Runnable#run
方法中:
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
...
// 处理请求
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor =
(AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
// 处理请求
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
...
}
}
代码中区分了同步与异步请求两种方式,实际上最终都会进入到DefaultRequestProcessor#processRequest
方法中:
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
...
switch (request.getCode()) {
..
// 操作kv 配置信息
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
// todo 查询broker版本信息
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
// todo 注册broker
case RequestCode.REGISTER_BROKER:
// 获取版本
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 做版本兼容
// 大于等于V3_0_11
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
// todo
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
// todo 注销broker
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
// todo 根据topic获取路由信息
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
// 省略其他消息的处理
...
default:
break;
}
return null;
}
这个方法就是用来处理网络请求的,处理的请求消息会比较多,这里我们仅关注以下类型的消息:
- 获取
broker
版本信息:请求code
为QUERY_DATA_VERSION
,用来查询broker
的版本信息 broker
注册:请求code
为REGISTER_BROKER
,broker
启动时,会将自己的信息注册到nameServer
broker
注销:请求code
为UNREGISTER_BROKER
,broker
停止前,会发消息告诉nameServer
自己将要关闭- 获取
topic
路由信息:根据topic
获取路由信息,其实就是topic
对应的broker
、messageQueue
信息
接下来我们就分析来介绍下这几种消息的实现。
2. broker
的注册与注销消息
在broker
启动时,会向NamerServer
发送注册消息;在broker
关闭前,会向NameServer
发送关闭消息。
我们先来看注册消息,处理broker
注册消息的方法为DefaultRequestProcessor#registerBrokerWithFilterServer
,我们直接看重要代码:
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
...
// 处理注册
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
...
}
这里我们去除了不必要的代码,仅保留了注册的方法,这里调用的是RouteInfoManager#registerBroker
方法,在分析这个方法前,我们先来了解下RouteInfoManager
的基本信息,它的几个重要成员变量如下:
public class RouteInfoManager {
...
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// todo topic -> queuedata topic消息队列的路由信息,消息发送时根据路由表进行负载均衡。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// todo broker名称 -> broker信息 Broker基础信息,包含brokerName、所属集群名称、主备Broker地址
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// todo 集群名字 -> broker名称 Broker集群信息,存储集群中所有Broker的名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// todo broker地址-> broker活跃信息 Broker状态信息,NameServer每次收到心跳包时会替换该信息。
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// todo broker地址 -> 一堆过滤器 Broker上的FilterServer列表,用于类模式消息过滤。
// 类模式过滤机制在4.4及以后版本被废弃
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
前面提到过NameServer
是一个非常简单的Topic
路由注册中心,这个HashMap
就是NameServer
实现注册中心的关键!
-
topicQueueTable
:存放保存topic
与Queue
的关系,value
类型为List
,表明一个topic
可以有多个queue
,QueueData
的成员变量如下:public class QueueData implements Comparable<QueueData> { // 所在的 borker 的名称 private String brokerName; // 读写数 private int readQueueNums; private int writeQueueNums; private int perm; private int topicSynFlag; ... }
-
brokerAddrTable
:记录broker
的具体信息,key
为broker
名称,value
为broker
具体信息,BrokerData
的成员变量如下:public class BrokerData implements Comparable<BrokerData> { // 所在集群的名称 private String cluster; // broker名称 private String brokerName; // borkerId对应的服务器地址,一个brokerName可以有多个broker服务器 private HashMap<Long, String> brokerAddrs; }
-
clusterAddrTable
:集群信息,保存集群名称对应的brokerName
-
brokerLiveTable
:存活的broker
信息,key
为broker
地址,value
为具体的broker
服务器,BrokerLiveInfo
的成员变量如下:class BrokerLiveInfo { // 上一次心跳更新时间 private long lastUpdateTimestamp; private DataVersion dataVersion; // 表示网络连接的channel,由netty提供 private Channel channel; // 高可用的服务地址 private String haServerAddr; ... }
了解完成这些后,再回过头来看RouteInfoManager#registerBroker
方法,我们就会发现所谓的注册就是往以上几个HashMap
中put
数据的操作:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
// 第一步 路由注册加写锁
this.lock.writeLock().lockInterruptibly();
// 判断broker所属集群是否存在
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
// 第二步,维护brokerData信息
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 第三步 创建或更新topic路由元数据
// broker是主节点
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
// topic配置信息发生变化 或 初次注册
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
// todo 创建或更新topic路由元数据,并填充topicQueueTable
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 第四步 更新brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
// 第5,注册broker的过滤器Server地址列表
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
// 释放锁
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
这样一来,这个方法所做的工作就一目了然了,就是把broker
上报的信息包装下,然后放到这几个hashMap
中。
了解完成注册操作后,注销操作就不难理解了,它是跟注册相反的操作,所做的事就是从这几个hashMap
中移除broker
对应的信息,处理方法为RouteInfoManager#unregisterBroker
,代码中确实是进行hashMap
移除的相关操作,这里就不分析了。
3. 根据topic
获取路由信息
在producer
或consumer
启动时,都需要根据topic
从NameServer
获取对应的路由信息,处理消息的方法为DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
// 创建response
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 获取请求头
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
// todo 获取某个topic的路由信息
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
// topicRouteData不为null
if (topicRouteData != null) {
// 是否支持顺序消费 默认false
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
// 组装响应并返回
// 序列化json
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
// 如果不存在 返回topic不存在code
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
这个方法里调用了RouteInfoManager#pickupTopicRouteData
方法,又是RouteInfoManager
,这里我们就可以想象到,获取topic
路由信息的操作,大致又是操作RouteInfoManager
中的几个HashMap
了:
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
// 读锁
this.lock.readLock().lockInterruptibly();
// 获取topic 获取queuedata 列表
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
// 如果不为null
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
// 设置foundQueueData
foundQueueData = true;
// 获取queuedata 对应broker name
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
// 再根据broker name 取broker地址列表中 获取broker信息
for (String brokerName : brokerNameSet) {
// 根据broker 名字 获取broker信息
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
// 设置foundBrokerData
foundBrokerData = true;
// 处理filter 就是根据broker地址获取对应filter集合 也就是某个broker可能对应一堆filter集合
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
从代码中来看,操作的是topicQueueTable
,根据topic
从topicQueueTable
中获取对应的queue
数据后,剩下的无非就是对数据进行包装、过滤等,构造返回值,这里就不细讲了。
4. 查询broker
版本信息
在broker
注册到nameServer
前,会先发一个code
为QUERY_DATA_VERSION
的消息,判断版本号是否有变化再决定是否进行注册,处理该消息的代码为:
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
...
switch (request.getCode()) {
...
// 查询dataVersion
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
...
...
}
进入DefaultRequestProcessor#queryBrokerTopicConfig
方法,代码如下:
public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
final QueryDataVersionRequestHeader requestHeader =
(QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);
DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);
// todo 关键代码:判断版本是否发生变化
Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);
if (!changed) {
// 如果没改变,就更新最后一次的上报时间为当前时间
this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());
}
DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
// 返回 nameServer当前的版本号
if (nameSeverDataVersion != null) {
response.setBody(nameSeverDataVersion.encode());
}
responseHeader.setChanged(changed);
return response;
}
这个方法主要包含3个操作:
4.1 判断broker
版本是否发生变化
这部分的判断,就是判断上报的版本号与NameServer
保存的版本号是否一致:
public boolean isBrokerTopicConfigChanged(final String brokerAddr,
final DataVersion dataVersion) {
// 继续查询
DataVersion prev = queryBrokerTopicConfig(brokerAddr);
return null == prev || !prev.equals(dataVersion);
}
这个方法就是判断逻辑了,只是一个简单的equals
操作,继续看DataVersion
的查询:
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (prev != null) {
return prev.getDataVersion();
}
return null;
}
又是对RouteInfoManager
那几个hashMap
的操作,最终是从brokerLiveTable
获取到了NameServer
保存的版本号。
4.2 版本没有发生变化时的操作
如果版本没有发生变化,就更新当前时间为最新上报时间,这个流程没法啥好说的,直接上代码:
public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (prev != null) {
prev.setLastUpdateTimestamp(System.currentTimeMillis());
}
}
又是对RouteInfoManager
那几个hashMap
的操作,这里需要注意的是,当DataVersion
没有发生变化,才会更新BrokerLiveInfo#lastUpdateTimestamp
成员变量的值为当前时间。
那么当DataVersion
发生变化时,就不会更新BrokerLiveInfo#lastUpdateTimestamp
的值了吗?并不是,如果DataVersion
发生了变化,就表明broker
需要再次注册,BrokerLiveInfo#lastUpdateTimestamp
会在注册请求里被改变了。
4.3 查询当前版本号
查询当前版本号,使用的方法是RouteInfoManager#queryBrokerTopicConfig
,在上面的1. 判断broker版本是否发生变化中就用过了,这里就不再赘述了。
5. 定时任务:检测broker
是否存活
在前面分析NamesrvController#initialize
时,我们提到该方法启动了一个定时任务:
// 开启定时任务,每隔10s扫描一次broker,移除不活跃的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
它调用的方法是RouteInfoManager#scanNotActiveBroker
,代码如下:
public void scanNotActiveBroker() {
// brokerLiveTable:存放活跃的broker,就是找出其中不活跃的,然后移除,操作的是 brokerLiveTable
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
// 上一次的心跳时间
long last = next.getValue().getLastUpdateTimestamp();
// 根据心跳时间判断是否存活,超时时间为2min
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
// 移除
it.remove();
// 处理channel的关闭,这个方法里会处理其他 hashMap 的移除
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
这个方法先是遍历brokerLiveTable
,然后判断每个BrokerLiveInfo
的最近一次的上报时间,判断是否超时,如果最近的上报时间距离当前超过了2分钟,说明该broker
可能挂了,就将它从brokerLiveTable
移除,然后调用RouteInfoManager#onChannelDestroy
方法,移除其他hashMap
的broker
.
6. 总结
本文分析了NameServer
对请求消息的处理,nameServer
底层使用netty进行通讯,处理broker
、producer
、consumer
请求消息的ChannelHandler
为NettyServerHandler
,最终的处理方法为DefaultRequestProcessor#processRequest
,这个方法会处理众多的请求,我们重点分析了注册/注销broker消息
、获取topic路由消息
、获取broker版本信息
的处理流程。
注册/注销broker消息
、获取topic路由消息
、获取broker版本信息
最终都是在RouteInfoManager
类中处理,这个类中有几个非常重要的、类型为HashMap
的成员变量如下:
topicQueueTable
:存放保存topic
与Queue
的关系,value
类型为List
,表明一个topic
可以有多个queue
brokerAddrTable
:记录broker
的具体信息,key
为broker
名称,value
为broker
具体信息clusterAddrTable
:集群信息,保存集群名称对应的brokerName
brokerLiveTable
:存活的broker
信息,key
为broker
地址,value
为具体的broker
服务器
这个几成员变量就是NameServer
被称为注册中心的原因所在,所谓的注册/注销broker
,就是往这几个hashMap
中put
或remove
相关的broker
信息;获取topic路由消息
就是从topicQueueTable
中获取broker/messageQueue
等信息。
而nameServer
所谓的"注册"、“发现”、“心跳”等,都是对RouteInfoManager
这几个hashMap
成员变量进行操作的。
好了,本文就到这里了,下篇开始我们将进入product
的分析
参考文章
转载自:https://juejin.cn/post/7208484366954774587