Nacos注册中心13-Server间操作(几个定时任务)
站长
· 阅读数 33
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
0. 环境
- nacos版本:1.4.1
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4
- Spring Cloud alibaba: 2.2.5.RELEASE
ServiceManager
在创建完对象后,会执行初始化方法init()
:
// com.alibaba.nacos.naming.core.ServiceManager#init
@PostConstruct
public void init() {
// todo 启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表
// 本机注册表是以各个服务的checksum(字串拼接)形式被发送的
GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
// todo 从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
if (emptyServiceAutoClean) {
Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",
cleanEmptyServiceDelay, cleanEmptyServicePeriod);
// delay 60s, period 20s;
// This task is not recommended to be performed frequently in order to avoid
// the possibility that the service cache information may just be deleted
// and then created due to the heartbeat mechanism
// todo 启动了一个定时任务:每20s清理一次注册表中的空service
// 空service,即没有任何instance的service
GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,
cleanEmptyServicePeriod);
}
try {
Loggers.SRV_LOG.info("listen for service meta change");
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
} catch (NacosException e) {
Loggers.SRV_LOG.error("listen for service meta change failed!");
}
}
Nacos Sever
在启动时会首先开启三项重要任务:
- 启动了一个定时任务:
每60s当前Server会向其它Nacos Server发送一次本机注册表
从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
- 启动了一个定时任务:
每20s清理一次注册表中的空service
1. ServiceReporter()
启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表。
private class ServiceReporter implements Runnable {
@Override
public void run() {
try {
// map的key为namespaceId,value为一个Set集合,集合中存放的是当前
// namespace中所有service的名称
// 这个map中存放的是当前注册表中所有服务的名称
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
//ignore
return;
}
// 遍历所有的namespace
for (String namespaceId : allServiceNames.keySet()) {
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
// 遍历当前namespace中的所有服务名称
for (String serviceName : allServiceNames.get(namespaceId)) {
// 若当前服务不归当前Server负责,则直接跳过
if (!distroMapper.responsible(serviceName)) {
continue;
}
// 从注册表中获取到当前遍历的服务
Service service = getService(namespaceId, serviceName);
if (service == null || service.isEmpty()) {
continue;
}
// 重新计算当前service的checksum
service.recalculateChecksum();
// 将计算好的checksum写入到map
checksum.addItem(serviceName, service.getChecksum());
}// end-内层for
Message msg = new Message();
// 将当前namespace中的所有服务的checksum写入到msg中,
// 将来将msg发送给其它nacos
msg.setData(JacksonUtils.toJson(checksum));
// 获取到所有nacos
Collection<Member> sameSiteServers = memberManager.allMembers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return;
}
// 遍历所有nacos,要将msg发送出去
for (Member server : sameSiteServers) {
// 若当前遍历的server是当前server,则直接跳过
if (server.getAddress().equals(NetUtils.localServer())) {
continue;
}
// todo 将msg发送给当前遍历的server
synchronizer.send(server.getAddress(), msg);
}
}// end-外层for
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
} finally {
// 开启下一次定时执行
GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
TimeUnit.MILLISECONDS);
}
}
}
然后执行com.alibaba.nacos.naming.misc.ServiceStatusSynchronizer#send
:
@Override
public void send(final String serverIP, Message msg) {
if (serverIP == null) {
return;
}
Map<String, String> params = new HashMap<String, String>(10);
params.put("statuses", msg.getData());
params.put("clientIP", NetUtils.localServer());
String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
if (IPUtil.containsPort(serverIP)) {
url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ "/service/status";
}
try {
// todo 发送请求
HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}",
serverIP);
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
}
}
2. UpdatedServiceProcessor()
从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
private class UpdatedServiceProcessor implements Runnable {
//get changed service from other server asynchronously
@Override
public void run() {
ServiceKey serviceKey = null;
try {
// 运行一个无限循环
while (true) {
try {
// 从队列中取出一个元素
// toBeUpdatedServicesQueue 中存放的是来自于其它Server的服务状态发生变更的服务
serviceKey = toBeUpdatedServicesQueue.take();
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
}
if (serviceKey == null) {
continue;
}
// 另外启用一个线程来完成ServiceUpdater任务
GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
}
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
}
}
}
ServiceUpdater.run
方法:
private class ServiceUpdater implements Runnable {
@Override
public void run() {
try {
// todo
updatedHealthStatus(namespaceId, serviceName, serverIP);
} catch (Exception e) {
Loggers.SRV_LOG
.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,
serverIP, e);
}
}
}
public RaftPeer getMySelfClusterState() {
return raftPeerSet.local();
}
/**
* Update health status of instance in service.
*
* @param namespaceId namespace
* @param serviceName service name
* @param serverIP source server Ip
*/
public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
// 从其它server获取指定服务的数据
Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
JsonNode serviceJson = JacksonUtils.toObj(msg.getData());
ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
// 这个map中存放的是来自于其它Nacos中的、当前服务所包含的所有instance的健康状态
// map的key为ip:port,value为healthy
Map<String, String> ipsMap = new HashMap<>(ipList.size());
// 遍历ipList
for (int i = 0; i < ipList.size(); i++) {
// 这个ip字符串的格式是: ip:port_healthy
String ip = ipList.get(i).asText();
String[] strings = ip.split("_");
// 将当前遍历instance的地址及健康状态写入到map
ipsMap.put(strings[0], strings[1]);
}
// 从注册表中获取当前服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
return;
}
boolean changed = false;
// 获取到注册表中当前服务的所有instance
List<Instance> instances = service.allIPs();
// 遍历注册表中当前服务的所有instance
for (Instance instance : instances) {
// 获取来自于其它nacos的当前遍历instance的健康状态
boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
// 若当前instance在注册表中记录的状态与外来的状态不一致,则以外来的为准
if (valid != instance.isHealthy()) {
changed = true;
// 将注册表中的instance状态修改为外来的状态
instance.setHealthy(valid);
Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,
(instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),
instance.getClusterName());
}
}
// 只要有一个instance的状态发生了变更,那么这个changed的值就为true
if (changed) {
// 发布状态变更事件
pushService.serviceChanged(service);
if (Loggers.EVT_LOG.isDebugEnabled()) {
StringBuilder stringBuilder = new StringBuilder();
List<Instance> allIps = service.allIPs();
for (Instance instance : allIps) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG
.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),
service.getName(), stringBuilder.toString());
}
}
}
3. EmptyServiceAutoClean()
启动了一个定时任务:每20s清理一次注册表中的空service
:
private class EmptyServiceAutoClean implements Runnable {
@Override
public void run() {
// Parallel flow opening threshold
// 这是一个并行流开启阈值:当一个namespace中包含的service的数量超过100时,
// 会将注册创建为一个并行流,否则就是一个串行流
int parallelSize = 100;
// 遍历注册表
// stringServiceMap 就是注册表的内层map
serviceMap.forEach((namespace, stringServiceMap) -> {
Stream<Map.Entry<String, Service>> stream = null;
// 若当前遍历的元素(namespace)中包含的服务的数量超出了阈值,
// 则生成一个并行流
if (stringServiceMap.size() > parallelSize) {
// 并行流
stream = stringServiceMap.entrySet().parallelStream();
} else {
// 串行流
stream = stringServiceMap.entrySet().stream();
}
stream.filter(entry -> {
final String serviceName = entry.getKey();
// 只要当前遍历的服务需要当前server负责,则通过过滤
return distroMapper.responsible(serviceName);
// 这里的forEach遍历的元素一定是最终需要由当前server处理的服务
}).forEach(entry -> stringServiceMap.computeIfPresent(entry.getKey(), (serviceName, service) -> {
if (service.isEmpty()) {
// To avoid violent Service removal, the number of times the Service
// experiences Empty is determined by finalizeCnt, and if the specified
// value is reached, it is removed
// 若当前服务为空的次数超出了最大允许值,则删除这个服务
if (service.getFinalizeCount() > maxFinalizeCount) {
Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", namespace,
serviceName);
try {
// todo 删除服务
easyRemoveService(namespace, serviceName);
} catch (Exception e) {
Loggers.SRV_LOG.error("namespace : {}, [{}] services are automatically clean has "
+ "error : {}", namespace, serviceName, e);
}
}
service.setFinalizeCount(service.getFinalizeCount() + 1);
Loggers.SRV_LOG
.debug("namespace : {}, [{}] The number of times the current service experiences "
+ "an empty instance is : {}", namespace, serviceName,
service.getFinalizeCount());
} else {
// 将计数器归零
service.setFinalizeCount(0);
}
return service;
}));
});
}
}
核心方法easyRemoveService(namespace, serviceName);
:
public void easyRemoveService(String namespaceId, String serviceName) throws Exception {
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new IllegalArgumentException("specified service not exist, serviceName : " + serviceName);
}
// 通过同步服务实现服务的删除
// 就是会对nacos集合中所有server执行删除操作
consistencyService.remove(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName));
}
4. 方法调用图
参考文章
nacos-1.4.1源码分析(注释) springcloud-source-study学习github地址 深度解析nacos注册中心 mac系统如何安装nacos