likes
comments
collection
share

Nacos注册中心13-Server间操作(几个定时任务)

作者站长头像
站长
· 阅读数 33

欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

0. 环境

  • nacos版本:1.4.1
  • Spring Cloud : 2020.0.2
  • Spring Boot :2.4.4
  • Spring Cloud alibaba: 2.2.5.RELEASE

测试代码:github.com/hsfxuebao/s…

ServiceManager在创建完对象后,会执行初始化方法init()

// com.alibaba.nacos.naming.core.ServiceManager#init
@PostConstruct
public void init() {
    // todo 启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表
    // 本机注册表是以各个服务的checksum(字串拼接)形式被发送的
    GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);

    // todo 从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
    GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());

    if (emptyServiceAutoClean) {

        Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",
                cleanEmptyServiceDelay, cleanEmptyServicePeriod);

        // delay 60s, period 20s;

        // This task is not recommended to be performed frequently in order to avoid
        // the possibility that the service cache information may just be deleted
        // and then created due to the heartbeat mechanism

        // todo 启动了一个定时任务:每20s清理一次注册表中的空service
        // 空service,即没有任何instance的service
        GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,
                cleanEmptyServicePeriod);
    }

    try {
        Loggers.SRV_LOG.info("listen for service meta change");
        consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
    } catch (NacosException e) {
        Loggers.SRV_LOG.error("listen for service meta change failed!");
    }
}

Nacos Sever在启动时会首先开启三项重要任务:

  • 启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表
  • 从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
  • 启动了一个定时任务:每20s清理一次注册表中的空service

1. ServiceReporter()

启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表。

private class ServiceReporter implements Runnable {

    @Override
    public void run() {
        try {
            // map的key为namespaceId,value为一个Set集合,集合中存放的是当前
            // namespace中所有service的名称
            // 这个map中存放的是当前注册表中所有服务的名称
            Map<String, Set<String>> allServiceNames = getAllServiceNames();

            if (allServiceNames.size() <= 0) {
                //ignore
                return;
            }
            // 遍历所有的namespace
            for (String namespaceId : allServiceNames.keySet()) {

                ServiceChecksum checksum = new ServiceChecksum(namespaceId);

                // 遍历当前namespace中的所有服务名称
                for (String serviceName : allServiceNames.get(namespaceId)) {
                    // 若当前服务不归当前Server负责,则直接跳过
                    if (!distroMapper.responsible(serviceName)) {
                        continue;
                    }

                    // 从注册表中获取到当前遍历的服务
                    Service service = getService(namespaceId, serviceName);

                    if (service == null || service.isEmpty()) {
                        continue;
                    }
                    // 重新计算当前service的checksum
                    service.recalculateChecksum();
                    // 将计算好的checksum写入到map
                    checksum.addItem(serviceName, service.getChecksum());
                }// end-内层for

                Message msg = new Message();
                // 将当前namespace中的所有服务的checksum写入到msg中,
                // 将来将msg发送给其它nacos
                msg.setData(JacksonUtils.toJson(checksum));
                // 获取到所有nacos
                Collection<Member> sameSiteServers = memberManager.allMembers();

                if (sameSiteServers == null || sameSiteServers.size() <= 0) {
                    return;
                }
                // 遍历所有nacos,要将msg发送出去
                for (Member server : sameSiteServers) {
                    // 若当前遍历的server是当前server,则直接跳过
                    if (server.getAddress().equals(NetUtils.localServer())) {
                        continue;
                    }
                    // todo 将msg发送给当前遍历的server
                    synchronizer.send(server.getAddress(), msg);
                }
            }// end-外层for
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
        } finally {
            // 开启下一次定时执行
            GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
                    TimeUnit.MILLISECONDS);
        }
    }
}

然后执行com.alibaba.nacos.naming.misc.ServiceStatusSynchronizer#send:

@Override
public void send(final String serverIP, Message msg) {
    if (serverIP == null) {
        return;
    }

    Map<String, String> params = new HashMap<String, String>(10);

    params.put("statuses", msg.getData());
    params.put("clientIP", NetUtils.localServer());

    String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
            + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";

    if (IPUtil.containsPort(serverIP)) {
        url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
                + "/service/status";
    }

    try {
        // todo 发送请求
        HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {
            @Override
            public void onReceive(RestResult<String> result) {
                if (!result.ok()) {
                    Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}",
                            serverIP);

                }
            }

            @Override
            public void onError(Throwable throwable) {
                Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, throwable);
            }

            @Override
            public void onCancel() {

            }
        });
    } catch (Exception e) {
        Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
    }

}

2. UpdatedServiceProcessor()

从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表

private class UpdatedServiceProcessor implements Runnable {

    //get changed service from other server asynchronously
    @Override
    public void run() {
        ServiceKey serviceKey = null;

        try {
            // 运行一个无限循环
            while (true) {
                try {
                    // 从队列中取出一个元素
                    // toBeUpdatedServicesQueue 中存放的是来自于其它Server的服务状态发生变更的服务
                    serviceKey = toBeUpdatedServicesQueue.take();
                } catch (Exception e) {
                    Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
                }

                if (serviceKey == null) {
                    continue;
                }
                // 另外启用一个线程来完成ServiceUpdater任务
                GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
            }
        } catch (Exception e) {
            Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
        }
    }
}

ServiceUpdater.run方法:

private class ServiceUpdater implements Runnable {

    @Override
    public void run() {
        try {
            // todo
            updatedHealthStatus(namespaceId, serviceName, serverIP);
        } catch (Exception e) {
            Loggers.SRV_LOG
                    .warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,
                            serverIP, e);
        }
    }
}

public RaftPeer getMySelfClusterState() {
    return raftPeerSet.local();
}

/**
 * Update health status of instance in service.
 *
 * @param namespaceId namespace
 * @param serviceName service name
 * @param serverIP    source server Ip
 */
public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
    // 从其它server获取指定服务的数据
    Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
    JsonNode serviceJson = JacksonUtils.toObj(msg.getData());

    ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
    // 这个map中存放的是来自于其它Nacos中的、当前服务所包含的所有instance的健康状态
    // map的key为ip:port,value为healthy
    Map<String, String> ipsMap = new HashMap<>(ipList.size());
    // 遍历ipList
    for (int i = 0; i < ipList.size(); i++) {
        // 这个ip字符串的格式是:  ip:port_healthy
        String ip = ipList.get(i).asText();
        String[] strings = ip.split("_");
        // 将当前遍历instance的地址及健康状态写入到map
        ipsMap.put(strings[0], strings[1]);
    }
    // 从注册表中获取当前服务
    Service service = getService(namespaceId, serviceName);

    if (service == null) {
        return;
    }

    boolean changed = false;
    // 获取到注册表中当前服务的所有instance
    List<Instance> instances = service.allIPs();
    // 遍历注册表中当前服务的所有instance
    for (Instance instance : instances) {
        // 获取来自于其它nacos的当前遍历instance的健康状态
        boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
        // 若当前instance在注册表中记录的状态与外来的状态不一致,则以外来的为准
        if (valid != instance.isHealthy()) {
            changed = true;
            // 将注册表中的instance状态修改为外来的状态
            instance.setHealthy(valid);
            Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,
                    (instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),
                    instance.getClusterName());
        }
    }

    // 只要有一个instance的状态发生了变更,那么这个changed的值就为true
    if (changed) {
        // 发布状态变更事件
        pushService.serviceChanged(service);
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            StringBuilder stringBuilder = new StringBuilder();
            List<Instance> allIps = service.allIPs();
            for (Instance instance : allIps) {
                stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
            }
            Loggers.EVT_LOG
                    .debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),
                            service.getName(), stringBuilder.toString());
        }
    }

}

3. EmptyServiceAutoClean()

启动了一个定时任务:每20s清理一次注册表中的空service:

private class EmptyServiceAutoClean implements Runnable {

    @Override
    public void run() {

        // Parallel flow opening threshold
        // 这是一个并行流开启阈值:当一个namespace中包含的service的数量超过100时,
        // 会将注册创建为一个并行流,否则就是一个串行流
        int parallelSize = 100;

        // 遍历注册表
        // stringServiceMap 就是注册表的内层map
        serviceMap.forEach((namespace, stringServiceMap) -> {
            Stream<Map.Entry<String, Service>> stream = null;
            // 若当前遍历的元素(namespace)中包含的服务的数量超出了阈值,
            // 则生成一个并行流
            if (stringServiceMap.size() > parallelSize) {
                // 并行流
                stream = stringServiceMap.entrySet().parallelStream();
            } else {
                // 串行流
                stream = stringServiceMap.entrySet().stream();
            }
            stream.filter(entry -> {
                final String serviceName = entry.getKey();
                // 只要当前遍历的服务需要当前server负责,则通过过滤
                return distroMapper.responsible(serviceName);
            // 这里的forEach遍历的元素一定是最终需要由当前server处理的服务
            }).forEach(entry -> stringServiceMap.computeIfPresent(entry.getKey(), (serviceName, service) -> {
                if (service.isEmpty()) {

                    // To avoid violent Service removal, the number of times the Service
                    // experiences Empty is determined by finalizeCnt, and if the specified
                    // value is reached, it is removed

                    // 若当前服务为空的次数超出了最大允许值,则删除这个服务
                    if (service.getFinalizeCount() > maxFinalizeCount) {
                        Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", namespace,
                                serviceName);
                        try {
                            // todo 删除服务
                            easyRemoveService(namespace, serviceName);
                        } catch (Exception e) {
                            Loggers.SRV_LOG.error("namespace : {}, [{}] services are automatically clean has "
                                    + "error : {}", namespace, serviceName, e);
                        }
                    }

                    service.setFinalizeCount(service.getFinalizeCount() + 1);

                    Loggers.SRV_LOG
                            .debug("namespace : {}, [{}] The number of times the current service experiences "
                                            + "an empty instance is : {}", namespace, serviceName,
                                    service.getFinalizeCount());
                } else {
                    // 将计数器归零
                    service.setFinalizeCount(0);
                }
                return service;
            }));
        });
    }
}

核心方法easyRemoveService(namespace, serviceName);

public void easyRemoveService(String namespaceId, String serviceName) throws Exception {

    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        throw new IllegalArgumentException("specified service not exist, serviceName : " + serviceName);
    }
    // 通过同步服务实现服务的删除
    // 就是会对nacos集合中所有server执行删除操作
    consistencyService.remove(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName));
}

4. 方法调用图

Nacos注册中心13-Server间操作(几个定时任务)

参考文章

nacos-1.4.1源码分析(注释) springcloud-source-study学习github地址 深度解析nacos注册中心 mac系统如何安装nacos