探秘RocketMQ 5.0 client端Nameserver地址更新的源码实现方式
这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
源码版本
- rocketmq:release-5.0.0
背景
上一章我们通过源码的方式详细的介绍了client
如何获取Nameserver
地址,今天我们就来继续研究研究client
是如何更新Nameserver
地址
Client如何更新本地namesrvAddr
client更新Nameserver的定时器主要是在这里启动的
this.startScheduledTask();
我们进去这个方法看一看
如果clientConfig
获取的namesrvAddr
地址为空就会启动一个定时任务
其中定时任务中方法
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
就是通过我们设置的wsAddr
(也就是获取namesrvAddr
的url地址)地址去获取namesrvAddr
地址
线程池scheduledExecutorService
的初始化
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
可以看到是线程池延迟1s后执行,每隔2分钟去http服务器去拉取namesrvAddr
的地址
实际更新本地的namesrvAddr
地址是通过updateNameServerAddressList
方法
最终实际去更新的还是NettyRemotingClient
中的namesrvAddrList
值
@Override
public void updateNameServerAddressList(List<String> addrs) {
List<String> old = this.namesrvAddrList.get();
boolean update = false;
if (!addrs.isEmpty()) {
if (null == old) {
update = true;
} else if (addrs.size() != old.size()) {
update = true;
} else {
for (int i = 0; i < addrs.size() && !update; i++) {
if (!old.contains(addrs.get(i))) {
update = true;
}
}
}
if (update) {
Collections.shuffle(addrs);
LOGGER.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
this.namesrvAddrList.set(addrs);
// should close the channel if choosed addr is not exist.
if (this.namesrvAddrChoosed.get() != null && !addrs.contains(this.namesrvAddrChoosed.get())) {
String namesrvAddr = this.namesrvAddrChoosed.get();
for (String addr : this.channelTables.keySet()) {
if (addr.contains(namesrvAddr)) {
ChannelWrapper channelWrapper = this.channelTables.get(addr);
if (channelWrapper != null) {
closeChannel(channelWrapper.getChannel());
}
}
}
}
}
}
}
这里除了更新namesrvAddrList
的地址外,还通过Collections.shuffle(addrs)
方法对addrs
的地址进行了随机打散,就是为了后面再随机选用一个namesrvAddr
地址,同时还做了一个事情就是如果新获取到的namesrvAddr
和原有的对比,如果旧的namesrvAddr
地址有被新namesrvAddr
地址剔除的就会将客户端与旧NameServer断开tcp连接
closeChannel(channelWrapper.getChannel());
值得注意的这里都是客户端的namesrvAddr
更新,还没有涉及到namesrvAddr
的重连,所以我们还需要看看namesrvAddr
重连的定时器
跟随源码可以定位到重连的定时器主要在NettyRemotingClient
这个类中启动的
NettyRemotingClient.this.scanAvailableNameSrv()
其中timer
的初始化也是new了一个简单的Timer
private final Timer timer = new Timer("ClientHouseKeepingService", true);
可以看到是启动就直接执行没有延时时间,循环时间默认3s执行一次,可通过参数com.rocketmq.remoting.client.connect.timeout
设置
这里我们进入到scanAvailableNameSrv
方法看看
private void scanAvailableNameSrv() {
List<String> nameServerList = this.namesrvAddrList.get();
if (nameServerList == null) {
LOGGER.debug("scanAvailableNameSrv Addresses of name server is empty!");
return;
}
for (final String namesrvAddr : nameServerList) {
scanExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Channel channel = NettyRemotingClient.this.getAndCreateChannel(namesrvAddr);
if (channel != null) {
NettyRemotingClient.this.availableNamesrvAddrMap.putIfAbsent(namesrvAddr, true);
} else {
NettyRemotingClient.this.availableNamesrvAddrMap.remove(namesrvAddr);
}
} catch (Exception e) {
LOGGER.error("scanAvailableNameSrv get channel of {} failed, ", namesrvAddr, e);
}
}
});
}
}
可以看到这里面又通过一个线程池scanExecutor
去异步扫描多个namesrvAddr
我们这里看看scanExecutor
线程池的初始化配置
this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet());
}
}
);
我们进入NettyRemotingClient.this.getAndCreateChannel
方法看看
注意这里第一行有一个判断就是
null == addr
这个判断是为了MQClientInstance.this.updateTopicRouteInfoFromNameServer();
这里准备的
还记得我们的startScheduledTask()
方法吗
这里是开了一个定时器,通过方法MQClientInstance.this.updateTopicRouteInfoFromNameServer();
定时去更新topic
的信息
默认时间可以看到是30s
我们可以看看TopicRouteData
的相关数据
值得一提的我们发送消息最终使用的topic
、queue
相关信息是使用的TopicPublishInfo
属性,TopicRouteData
与TopicPublishInfo
相关的转化是通过
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route)
方法
我们可以看看转换后的TopicPublishInfo
是啥样
然后我们来看看getAndCreateNameserverChannel()
方法
private Channel getAndCreateNameserverChannel() throws InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
final List<String> addrList = this.namesrvAddrList.get();
if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) {
return channelNew;
}
}
throw new RemotingConnectException(addrList.toString());
}
} catch (Exception e) {
LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e);
} finally {
this.namesrvChannelLock.unlock();
}
} else {
LOGGER.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
return null;
}
有一些值得注意的是
- 通过
namesrvAddrChoosed
获取nameserver地址 - 连接的channal存储在
channelTables
中复用 - 如果
namesrvAddrChoosed
获取到的nameserver地址不存在或者不是存活的则随机再从namesrvAddrList
获取一个地址 具体核心代码如下
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
这里就是每次发送消息更新topic的流程
然后我们来看看scanAvailableNameSrv
流程的this.createChannel(addr)
方法
private Channel createChannel(final String addr) throws InterruptedException {
//获取原来的ChannelWrapper
ChannelWrapper cw = this.channelTables.get(addr);
// 如果存在并且存活则直接返回
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
// 是否创建新的连接
boolean createNewConnection;
//双重检查 类似双重检查的单例模式
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
// 创建新的连接
if (createNewConnection) {
// 连接到 Nameserver
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
LOGGER.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString());
}
} else {
LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
}
return null;
}
小结
总的来说客户端获取namesrvAddr
的方式是三秒更新一次,所以客户端3s就能感受到namesrvAddr
是否下线。
连接相关的信息存储在channelTables
中,即
总结
- 客户端每隔30s就会去扫描判断
Nameserver
是否存活 - 客户端每隔2分钟才会去http服务器拉取最新的
Nameserver
地址 - 客户端topic相关的缓存信息每隔30s会去
Nameserver
拉取最新的topic、broker相关信息
转载自:https://juejin.cn/post/7208793045479456828