likes
comments
collection
share

Nacos注册中心5-Client端(更新本地服务)

作者站长头像
站长
· 阅读数 8

欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

0. 环境

  • nacos版本:1.4.1
  • Spring Cloud : 2020.0.2
  • Spring Boot :2.4.4
  • Spring Cloud alibaba: 2.2.5.RELEASE

测试代码:github.com/hsfxuebao/s…

1. 更新本地服务

spring.factories: Nacos注册中心5-Client端(更新本地服务) 在这个NacosDiscoveryClientConfiguration注入了NacosWatch

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
      CommonsClientAutoConfiguration.class })
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {

   @Bean
   public DiscoveryClient nacosDiscoveryClient(
         NacosServiceDiscovery nacosServiceDiscovery) {
      return new NacosDiscoveryClient(nacosServiceDiscovery);
   }

   @Bean
   @ConditionalOnMissingBean
   @ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
         matchIfMissing = true)
   public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
         NacosDiscoveryProperties nacosDiscoveryProperties,
         ObjectProvider<ThreadPoolTaskScheduler> taskExecutorObjectProvider) {
      return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties,
            taskExecutorObjectProvider);
   }
}

NacosWatch实现了SmartLifecycle,在环境准备好了之后,会调用start方法:

public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {

    @Override
    public void start() {
       if (this.running.compareAndSet(false, true)) {
          EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
                event -> new EventListener() {
                   @Override
                   public void onEvent(Event event) {
                      if (event instanceof NamingEvent) {
                         List<Instance> instances = ((NamingEvent) event)
                               .getInstances();
                         Optional<Instance> instanceOptional = selectCurrentInstance(
                               instances);
                         instanceOptional.ifPresent(currentInstance -> {
                            resetIfNeeded(currentInstance);
                         });
                      }
                   }
                });

          NamingService namingService = nacosServiceManager
                .getNamingService(properties.getNacosProperties());
          try {
             // todo 订阅
             namingService.subscribe(properties.getService(), properties.getGroup(),
                   Arrays.asList(properties.getClusterName()), eventListener);
          }
          catch (Exception e) {
             log.error("namingService subscribe failed, properties:{}", properties, e);
          }

          this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
                this::nacosServicesWatch, this.properties.getWatchDelay());
       }
    }
}

com.alibaba.nacos.client.naming.NacosNamingService#subscribe()
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","),
            listener);
}
com.alibaba.nacos.client.naming.core.HostReactor#subscribe
public void subscribe(String serviceName, String clusters, EventListener eventListener) {
    notifier.registerListener(serviceName, clusters, eventListener);
    getServiceInfo(serviceName, clusters);
}

核心方法getServiceInfo

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    // key的格式为:groupId@@微服务名称@@clusters名称
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }

    // todo 从当前Client本地注册表中获取当前服务
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

    // 若本地注册表中没有该服务,则创建一个
    if (null == serviceObj) {
        // 创建一个空的服务(没有任何提供者实例instance的ServiceInfo)
        serviceObj = new ServiceInfo(serviceName, clusters);

        serviceInfoMap.put(serviceObj.getKey(), serviceObj);

        // updatingMap 是一个临时缓存,主要使用这个map的key
        // map的key不能重复的特性
        // 只要这个服务名称在这个map中,说明这个服务正在更新中
        updatingMap.put(serviceName, new Object());
        // todo 更新本地注册表ServiceName的服务
        updateServiceNow(serviceName, clusters);
        // 更新完毕,从updatingMap中删除
        updatingMap.remove(serviceName);

    // 若当前注册表中已经有这个服务,那么查看一下临时map下
    // 是否存在该服务,若存在,说明当前服务正在更新中,所以本次操作先等待一段时间,默认5s
    } else if (updatingMap.containsKey(serviceName)) {

        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER
                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }

    // todo 启动一个定时任务,定时更新本地注册表中的当前服务
    scheduleUpdateIfAbsent(serviceName, clusters);

    return serviceInfoMap.get(serviceObj.getKey());
}

主要有2个步骤,第一是更新本地服务,第二是开启定时任务。

1.1 更新本地服务

核心方法:updateServiceNow(serviceName, clusters);:

private void updateServiceNow(String serviceName, String clusters) {
    try {
        // todo
        updateService(serviceName, clusters);
    } catch (NacosException e) {
        NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
    }
}

public void updateService(String serviceName, String clusters) throws NacosException {
    // 本地注册表中获取当前服务
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        // todo 提交get请求,获取服务ServiceInfo
        // 需要注意,返回的是json串
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

        if (StringUtils.isNotEmpty(result)) {
            // todo 将来自Server的ServiceInfo更新到本地
            processServiceJson(result);
        }
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}
  • 获取Service对应的所有主机信息:serverProxy.queryList

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
            throws NacosException {
    
        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
    
        return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
    }
    

    获取某一服务所有的主机实例:GET请求 url:/nacos/v1/ns/instance/list

  • 将来自Server的ServiceInfo更新到本地processServiceJson(result):

    // todo 来自Server的数据是最新 数据
    public ServiceInfo processServiceJson(String json) {
        // 转成ServiceInfo类
        ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
        // 从本地注册表中获取对应服务
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    
        if (pushEmptyProtection && !serviceInfo.validate()) {
            //empty or error push, just ignore
            return oldService;
        }
    
        boolean changed = false;
    
        // 当前注册表中存在该服务,想办法将来自server端的数据更新到本地注册表中
        if (oldService != null) {
    
            // 为了安全起见,这种情况几乎是不存在的
            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                        + serviceInfo.getLastRefTime());
            }
            // 来自server的serviceInfo替换到注册表中的当前服务
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    
            // 遍历本地注册表中当前服务所有instance实例
            Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
            for (Instance host : oldService.getHosts()) {
                // 将当前遍历instance主机的ip:port作为key,instance为value
                // 写入到一个新的map中
                oldHostMap.put(host.toInetAddr(), host);
            }
    
            // 遍历来自server的当前服务 所有instance实例
            Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
            for (Instance host : serviceInfo.getHosts()) {
                // 将当前遍历instance主机的ip:port作为key,instance为value
                // 写入到一个新的map中
                newHostMap.put(host.toInetAddr(), host);
            }
    
            // 该set集合中存放的是,两个map(oldHostMap与newHostMap)中都有的ip:port,
            // 但它们的instance不相同,此时会将来自于server的instance写入到这个set
            Set<Instance> modHosts = new HashSet<Instance>();
            // 只有newHostMap中存在的instance,即在server端新增的instance
            Set<Instance> newHosts = new HashSet<Instance>();
            // 只有oldHostMap中存在的instance,即在server端被删除的instance
            Set<Instance> remvHosts = new HashSet<Instance>();
    
            List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
                    newHostMap.entrySet());
            // 遍历来自于server的主机
            for (Map.Entry<String, Instance> entry : newServiceHosts) {
    
                Instance host = entry.getValue();
                // ip:port
                String key = entry.getKey();
                // 在注册表中存在该ip:port,但这两个instance又不同,则将这个instance写入到modHosts
                if (oldHostMap.containsKey(key) && !StringUtils
                        .equals(host.toString(), oldHostMap.get(key).toString())) {
                    modHosts.add(host);
                    continue;
                }
                // 若注册表中不存在该ip:port,说明这个主机是新增的,则将其写入到newHosts
                if (!oldHostMap.containsKey(key)) {
                    newHosts.add(host);
                }
            }
            // 遍历来自于本地注册表的主机
            for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (newHostMap.containsKey(key)) {
                    continue;
                }
                // 注册表中存在,但来自于server的serviceInfo中不存在,
                // 说明这个instance被干掉了,将其写入到remvHosts
                if (!newHostMap.containsKey(key)) {
                    remvHosts.add(host);
                }
    
            }
    
            if (newHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(newHosts));
            }
    
            if (remvHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(remvHosts));
            }
    
            if (modHosts.size() > 0) {
                changed = true;
                // todo 变更心跳信息BeatInfo
                updateBeatInfo(modHosts);
                NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(modHosts));
            }
    
            serviceInfo.setJsonFromServer(json);
            // 只要发生了变更,就将这个发生变更的serviceInfo记录到一个缓存队列
            if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
                NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                        serviceInfo.getClusters(), serviceInfo.getHosts()));
                DiskCache.write(serviceInfo, cacheDir);
            }
    
        // 本地注册表中没有这个服务,直接将来自Server的serviceInfo写入到本地注册表
        } else {
            changed = true;
            NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
            // 将来自于server的serviceInfo写入到注册表
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            // 将这个发生变更的serviceInfo记录到一个缓存队列
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
            serviceInfo.setJsonFromServer(json);
            DiskCache.write(serviceInfo, cacheDir);
        }
    
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    
        if (changed) {
            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
        }
    
        return serviceInfo;
    }
    

    主要是更新本地的服务为最新的消息,实例Instance发生变更发送InstancesChangeEvent事件。

1.2 开启定时任务

// todo 启动一个定时任务,定时更新本地注册表中的当前服务
scheduleUpdateIfAbsent(serviceName, clusters);

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    // futureMap是一个缓存map,其key为 groupId@@微服务名称@@clusters
    // value是一个定时异步操作对象
    // 这种结构称之为:双重检测锁,DCL,Double Check Lock
    // 该结构是为了避免在并发情况下,多线程重复写入数据
    // 该结构的特征:
    // 1)有两个不为null的判断
    // 2)有共享集合
    // 3)有synchronized代码块
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }

    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        // 创建一个定时异步操作对象,并启动这个定时任务
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        // 将这个定时异步操作对象写入到缓存map
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}

我们看一下UpdateTaskrun方法

public class UpdateTask implements Runnable {

        long lastRefTime = Long.MAX_VALUE;

        private final String clusters;

        private final String serviceName;

        /**
         * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
         */
        private int failCount = 0;

        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }

        private void incFailCount() {
            int limit = 6;
            if (failCount == limit) {
                return;
            }
            failCount++;
        }

        private void resetFailCount() {
            failCount = 0;
        }

        @Override
        public void run() {
            long delayTime = DEFAULT_DELAY;

            try {
                // 从本地注册表中获取当前服务
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                // 若本地注册表中不存在该服务,则从server获取到后,更新到本地注册表
                if (serviceObj == null) {
                    // 从server获取当前服务,并更新到本地注册表
                    updateService(serviceName, clusters);
                    return;
                }

                // 处理本地注册表中存在当前服务的情况
                // 1)serviceObj.getLastRefTime() 获取到的是当前服务最后被访问的时间,这个时间
                // 是来自于本地注册表的,其记录的是所有提供这个服务的instance中最后一个instance
                // 被访问的时间
                // 2)缓存lastRefTime 记录的是当前instance最后被访问的时间
                // 若1)时间 小于 2)时间,说明当前注册表应该更新的
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }
                // 将来自于注册表的这个最后访问时间更新到当前client的缓存
                lastRefTime = serviceObj.getLastRefTime();

                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
                        .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    // abort the update task
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }
                delayTime = serviceObj.getCacheMillis();
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
                // 开启下一次的定时任务
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }
    }
}

核心还是updateService(serviceName, clusters),前面我们已经分析了。

2. 方法调用图

Nacos注册中心5-Client端(更新本地服务)

参考文章

nacos-1.4.1源码分析(注释) springcloud-source-study学习github地址 深度解析nacos注册中心 mac系统如何安装nacos