likes
comments
collection
share

13、Nacos 源码分析-Distro协议(上)

作者站长头像
站长
· 阅读数 21

应粉丝要求,想要我出一篇关于NacosDistro协议的相关内容。于是我也去研究了一下。下面分享一下我对NacosDistro协议的一些见解。如有不对之处欢迎批评指正。

本次还是以Nacos2.2.0的版本来进行分析。

首先定位一下distro的代码哪里,最简单的方法就是用IDE的查找方式,找到了两个关于distro的文件夹。

13、Nacos 源码分析-Distro协议(上)

我们在写代码的时候按功能模块定义文件夹,可以方便我们自己和他人在快速找到相关的逻辑。

我们接着看core模块中的distro结构,它位于distributed包下面,和他同级别的还有idraft文件夹,说明这个distributed包是和分布式相关的内容,包含了distro协议,raft协议和分布式id生成的相关内容。

13、Nacos 源码分析-Distro协议(上)

既然是分析distro协议,当然关注点在distro包中了,那从哪里开始呢?

我们看到有个DistroConfig,那就从这个开始吧,因为不管是什么逻辑代码,肯定有有一些配置项,配置信息的,我们可以通过配置信息和配置项去找到相关的逻辑和核心处理类。

在spring中,我们也可以通过分析@Configuration配置信息查看配置了一些基础信息,可以更快了了解工程的基础架构设施有哪些

DistroConfig类继承了AbstractDynamicConfig,里面也是写了几个配置信息。

public class DistroConfig extends AbstractDynamicConfig {
    
    private static final String DISTRO = "Distro";
    // 单例模式下的初始化
    private static final DistroConfig INSTANCE = new DistroConfig();
    // 同步延迟时间,默认1s
    private long syncDelayMillis = DistroConstants.DEFAULT_DATA_SYNC_DELAY_MILLISECONDS;
    // 同步超时时间,默认3s
    private long syncTimeoutMillis = DistroConstants.DEFAULT_DATA_SYNC_TIMEOUT_MILLISECONDS;
    // 同步重试延迟时间,默认3s
    private long syncRetryDelayMillis = DistroConstants.DEFAULT_DATA_SYNC_RETRY_DELAY_MILLISECONDS;
    // 验证间隔时间,默认5s
    private long verifyIntervalMillis = DistroConstants.DEFAULT_DATA_VERIFY_INTERVAL_MILLISECONDS;
    // 验证超时时间,默认3s
    private long verifyTimeoutMillis = DistroConstants.DEFAULT_DATA_VERIFY_TIMEOUT_MILLISECONDS;
    // 导入数据延迟重试时间,默认30s
    private long loadDataRetryDelayMillis = DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS;
    // 导入超时时间,默认30s
    private long loadDataTimeoutMillis = DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS;
    
    private DistroConfig() {
        super(DISTRO);
        resetConfig();
    }
    
    @Override
    protected void getConfigFromEnv() {
        // 省略从环境变量中获取配置信息内容
    }
    // 省略get,set和printConfig方法
}

通过对配置信息引用的查找,我们可以找到一个核心类DistroClientComponentRegistry,这个类位于com.alibaba.nacos.naming.consistency.ephemeral.distro.v2,其初始化了几个重要的与Distro相关的类。

@Component
// spring创建Bean
public class DistroClientComponentRegistry {
    private final ServerMemberManager serverMemberManager;
    private final DistroProtocol distroProtocol;
    private final DistroComponentHolder componentHolder;
    private final DistroTaskEngineHolder taskEngineHolder;
    private final ClientManager clientManager;
    private final ClusterRpcClientProxy clusterRpcClientProxy;
    // 构造方法注入
    public DistroClientComponentRegistry(ServerMemberManager serverMemberManager, DistroProtocol distroProtocol,
            DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder,
            ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy) {
        this.serverMemberManager = serverMemberManager;
        this.distroProtocol = distroProtocol;
        this.componentHolder = componentHolder;
        this.taskEngineHolder = taskEngineHolder;
        this.clientManager = clientManager;
        this.clusterRpcClientProxy = clusterRpcClientProxy;
    }
    
    /**
     * 为v2的发行版协议注册必要的组件
     */
    @PostConstruct
    public void doRegister() {
        // 构造方法后执行
        DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol);
        DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy,
                serverMemberManager);
        DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder);
        // 给componentHolder注册相关处理类
        componentHolder.registerDataStorage(DistroClientDataProcessor.TYPE, dataProcessor);
        componentHolder.registerDataProcessor(dataProcessor);
        componentHolder.registerTransportAgent(DistroClientDataProcessor.TYPE, transportAgent);
        componentHolder.registerFailedTaskHandler(DistroClientDataProcessor.TYPE, taskFailedHandler);
    }
}

找到了入口后,我们开始一个个分析。

#ServerMemberManager

ServerMemberManager11、Nacos 配置服务服务端源码分析(二)这篇中已经分析了,它的主要功能是识别配置的服务端的其他节点的地址,可以支持配置在配置文件中或者单独的cluster.conf文件中,并且对文件进行了监控,发现文件数据变化的话,动态更新服务端的地址。

DistroProtocol

DistroProtocol顾明思议,这个就是DistroProtocol协议的本体。我们重点分析一下这个类。

@Component
// spring管理,由spring创建
public class DistroProtocol {
    
    private final ServerMemberManager memberManager;
    private final DistroComponentHolder distroComponentHolder;
    private final DistroTaskEngineHolder distroTaskEngineHolder;
    private volatile boolean isInitialized = false;
    
    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
            DistroTaskEngineHolder distroTaskEngineHolder) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        // 构造方法开始执行Distro协议任务
        startDistroTask();
    }
    
    private void startDistroTask() {
        if (EnvUtil.getStandaloneMode()) {
            // 单机模式无需使用Distro协议
            isInitialized = true;
            return;
        }
        // 开启验证
        startVerifyTask();
        // 开启导入
        startLoadTask();
    }
    
    private void startLoadTask() {
        // 回调函数,如果导入数据成功,说明已经初始化,否则为false
        DistroCallback loadCallback = new DistroCallback() {
            @Override
            public void onSuccess() {
                isInitialized = true;
            }
            
            @Override
            public void onFailed(Throwable throwable) {
                isInitialized = false;
            }
        };
        // 执行数据加载导入,只执行一次
        GlobalExecutor.submitLoadDataTask(
                new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
    }
    
    private void startVerifyTask() {
        // 固定每次间隔5s执行验证处理
        GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
                        distroTaskEngineHolder.getExecuteWorkersManager()),
                DistroConfig.getInstance().getVerifyIntervalMillis());
    }
    
    public boolean isInitialized() {
        return isInitialized;
    }
    
    /**
     * 通过配置的延迟开始同步,默认1s
     */
    public void sync(DistroKey distroKey, DataOperation action) {
        sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
    }
    
    /**
     * 将数据同步到所有远程服务器
     */
    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        for (Member each : memberManager.allMembersWithoutSelf()) {
            syncToTarget(distroKey, action, each.getAddress(), delay);
        }
    }
    
    /**
     * 同步到目标服务器
     */
    public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                targetServer);
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
        }
    }
    
    /**
     * 从指定服务器查询数据
     */
    public DistroData queryFromRemote(DistroKey distroKey) {
        if (null == distroKey.getTargetServer()) {
            Loggers.DISTRO.warn("[DISTRO] Can't query data from empty server");
            return null;
        }
        String resourceType = distroKey.getResourceType();
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
        if (null == transportAgent) {
            Loggers.DISTRO.warn("[DISTRO] Can't find transport agent for key {}", resourceType);
            return null;
        }
        return transportAgent.getData(distroKey, distroKey.getTargetServer());
    }
    
    /**
     * 接收同步的distro数据,查找响应的处理器的处理
     */
    public boolean onReceive(DistroData distroData) {
        Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
                distroData.getDistroKey());
        String resourceType = distroData.getDistroKey().getResourceType();
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
            return false;
        }
        return dataProcessor.processData(distroData);
    }
    
    /**
     * 接收验证数据,查找响应的处理器的处理
     */
    public boolean onVerify(DistroData distroData, String sourceAddress) {
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),
                    distroData.getDistroKey());
        }
        String resourceType = distroData.getDistroKey().getResourceType();
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
            return false;
        }
        return dataProcessor.processVerifyData(distroData, sourceAddress);
    }
    
    /**
     * 根据input distro key查找数据
     */
    public DistroData onQuery(DistroKey distroKey) {
        String resourceType = distroKey.getResourceType();
        DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType);
        if (null == distroDataStorage) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", resourceType);
            return new DistroData(distroKey, new byte[0]);
        }
        return distroDataStorage.getDistroData(distroKey);
    }
    
    /**
     * 查询所有快照数据
     */
    public DistroData onSnapshot(String type) {
        DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
        if (null == distroDataStorage) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
            return new DistroData(new DistroKey("snapshot", type), new byte[0]);
        }
        return distroDataStorage.getDatumSnapshot();
    }
}

从这个类可以看出,其方法都是在接受数据,查询数据,同步数据的操作。我们先重点分析一下startDistroTask的两个操作。

startVerifyTask

startVerifyTask()是用了一个可延迟执行的线程池创建线程执行DistroVerifyTimedTaskrun()方法。

public void run() {
    try {
        // 获取非本机的其他服务节点
        List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("server list is: {}", targetServer);
        }
        for (String each : distroComponentHolder.getDataStorageTypes()) {
            // 根据类型来验证,这个类型代表着协议类型,2.2.0的版本只会用Grpc的类型
            verifyForDataStorage(each, targetServer);
        }
    } catch (Exception e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
    }
}

private void verifyForDataStorage(String type, List<Member> targetServer) {
    // 获取处理存储类
    DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
    if (!dataStorage.isFinishInitial()) {
        Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
                            dataStorage.getClass().getSimpleName());
        return;
    }
    // 拿到验证的数据
    List<DistroData> verifyData = dataStorage.getVerifyData();
    if (null == verifyData || verifyData.isEmpty()) {
        return;
    }
    for (Member member : targetServer) {
        DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
        if (null == agent) {
            continue;
        }
        // 通过执行器执行
        executeTaskExecuteEngine.addTask(member.getAddress() + type,
                                         new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
    }
}

在验证的处理方法中,先需要拿到非本机的其他节点,然后进行验证

验证的时候会首先拿到本机的数据,再对所有节点都执行验证操作

我们看下getVerifyData()部分代码,看看验证数据是什么。

public List<DistroData> getVerifyData() {
    List<DistroData> result = null;
    for (String each : clientManager.allClientId()) {
        // 对每个本机所管理的注册客户端进行处理
        Client client = clientManager.getClient(each);
        if (null == client || !client.isEphemeral()) {
            // 空的或者是非临时性的节点,不处理
            continue;
        }
        // 如果是自己管理的客户端
        if (clientManager.isResponsibleClient(client)) {
            // 需要验证的数据就是每个节点的clientId和revision
            DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(),
                                                                           client.getRevision());
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            DistroData data = new DistroData(distroKey,
                                             ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
            data.setType(DataOperation.VERIFY);
            if (result == null) {
                result = new LinkedList<>();
            }
            result.add(data);
        }
    }
    return result;
}

通过代码发现,其需要验证的连接服务的客户端的clinetIdrevision。获取到数据后,再通过executeTaskExecuteEngine.addTask(member.getAddress() + type, new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type))处理。

我们继续看下DistroVerifyExecuteTaskrun方法。

 public void run() {
     for (DistroData each : verifyData) {
         try {
             if (transportAgent.supportCallbackTransport()) {
                 // 支持回调的处理方法
                 doSyncVerifyDataWithCallback(each);
             } else {
                 // 不支持回调的处理方法
                 doSyncVerifyData(each);
             }
         } catch (Exception e) {
             Loggers.DISTRO
                 .error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);
         }
     }
 }

为了简单,这里只分析非回调的方法

public boolean syncVerifyData(DistroData verifyData, String targetServer) {
    if (isNoExistTarget(targetServer)) {
        // 本地节点的服务列表不包含目标服务,直接返回
        return true;
    }
    // 将目标服务器替换为自身服务器,以便可以进行回调。
    verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
    // 创建DistroDataRequest
    DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
    Member member = memberManager.find(targetServer);
    if (checkTargetServerStatusUnhealthy(member)) {
        Loggers.DISTRO
            .warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer,
                  verifyData.getDistroKey());
        return false;
    }
    try {
        // rpc请求
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        return checkResponse(response);
    } catch (NacosException e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! key: {} ", verifyData.getDistroKey(), e);
    }
    return false;
}

DistroVerifyExecuteTask的逻辑是通过Grpc发送到其余所有服务节点同步验证DistroData

startLoadTask

startLoadTask()执行DistroLoadDataTaskrun()方法

public void run() {
    try {
        // 执行load方法
        load();
        if (!checkCompleted()) {
            // 未处理完,延迟重试
            GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
        } else {
            // 处理完了则回调
            loadCallback.onSuccess();
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
        }
    } catch (Exception e) {
        loadCallback.onFailed(e);
        Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
    }
}

private void load() throws Exception {
    while (memberManager.allMembersWithoutSelf().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
        TimeUnit.SECONDS.sleep(1);
    }
    while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
        TimeUnit.SECONDS.sleep(1);
    }
    for (String each : distroComponentHolder.getDataStorageTypes()) {
        if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
            // 处理所有协议类型,这里只有V2 Grpc类型
            loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
        }
    }
}

private boolean loadAllDataSnapshotFromRemote(String resourceType) {
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == transportAgent || null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
                            resourceType, transportAgent, dataProcessor);
        return false;
    }
    for (Member each : memberManager.allMembersWithoutSelf()) {
        long startTime = System.currentTimeMillis();
        try {
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
            // 获取快照,处理快照
            DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
            Loggers.DISTRO.info("[DISTRO-INIT] it took {} ms to load snapshot {} from {} and snapshot size is {}.",
                                System.currentTimeMillis() - startTime, resourceType, each.getAddress(),
                                getDistroDataLength(distroData));
            boolean result = dataProcessor.processSnapshot(distroData);
            Loggers.DISTRO
                .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
                      result);
            if (result) {
                distroComponentHolder.findDataStorage(resourceType).finishInitial();
                return true;
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
        }
    }
    return false;
}

导入数据是为了导入所有节点上的最新的快照数据。

DistroComponentHolder

DistroComponentHolder比较简单,就是对需要使用的处理类,任务执行类,协议处理类,远程调用类进行存储,只有这部分内容

@Component
// spring 创建
public class DistroComponentHolder {
    // 存储不同类型的DistroData传输对象
    private final Map<String, DistroTransportAgent> transportAgentMap = new HashMap<>();
     // 存储不同类型的DistroData装载容器
    private final Map<String, DistroDataStorage> dataStorageMap = new HashMap<>();
    // 存储不同类型的Distro失败任务处理器
    private final Map<String, DistroFailedTaskHandler> failedTaskHandlerMap = new HashMap<>();
    // 存储不同类型的DistroData数据处理器
    private final Map<String, DistroDataProcessor> dataProcessorMap = new HashMap<>();
    // 省略一些放入和获取方法
}

DistroTaskEngineHolder

DistroTaskEngineHolder持有了两种TaskExecuteEngine,分别是立即执行的DistroExecuteTaskExecuteEngine和可延迟的DistroDelayTaskExecuteEngine。关于这两种TaskExecuteEngine可以查看5、Nacos 服务注册服务端源码分析(四)

@Component
public class DistroTaskEngineHolder implements DisposableBean {
    
    private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
    
    private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
    
    public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
        DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
        // 设置默认的处理类
        delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
    }
    
    public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
        return delayTaskExecuteEngine;
    }
    
    public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
        return executeWorkersManager;
    }
    
    public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
        this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
    }
    
    @Override
    public void destroy() throws Exception {
        this.delayTaskExecuteEngine.shutdown();
        this.executeWorkersManager.shutdown();
    }
}

ClientManagerDelegate

ClientManagerDelegate是一个委托类,其可以委托给ConnectionBasedClientManagerEphemeralIpPortClientManagerPersistentIpPortClientManager的一个进行处理。

@DependsOn({"clientServiceIndexesManager", "namingMetadataManager"})
@Component("clientManager")
public class ClientManagerDelegate implements ClientManager {
    
    private final ConnectionBasedClientManager connectionBasedClientManager;
    private final EphemeralIpPortClientManager ephemeralIpPortClientManager;
    private final PersistentIpPortClientManager persistentIpPortClientManager;
    // 构造方法注入
    public ClientManagerDelegate(ConnectionBasedClientManager connectionBasedClientManager,
            EphemeralIpPortClientManager ephemeralIpPortClientManager,
            PersistentIpPortClientManager persistentIpPortClientManager) {
        this.connectionBasedClientManager = connectionBasedClientManager;
        this.ephemeralIpPortClientManager = ephemeralIpPortClientManager;
        this.persistentIpPortClientManager = persistentIpPortClientManager;
    }
    
    @Override
    public boolean clientConnected(String clientId, ClientAttributes attributes) {
        // 客户端连接
        return getClientManagerById(clientId).clientConnected(clientId, attributes);
    }
    
    @Override
    public boolean clientConnected(Client client) {
        // 客户端连接
        return getClientManagerById(client.getClientId()).clientConnected(client);
    }
    
    @Override
    public boolean syncClientConnected(String clientId, ClientAttributes attributes) {
        // 同步客户端连接
        return getClientManagerById(clientId).syncClientConnected(clientId, attributes);
    }
    
    @Override
    public boolean clientDisconnected(String clientId) {
        // 客户端断开
        return getClientManagerById(clientId).clientDisconnected(clientId);
    }
    
    @Override
    public Client getClient(String clientId) {
        // 获取客户端
        return getClientManagerById(clientId).getClient(clientId);
    }
    
    @Override
    public boolean contains(String clientId) {
        return connectionBasedClientManager.contains(clientId) || ephemeralIpPortClientManager.contains(clientId)
                || persistentIpPortClientManager.contains(clientId);
    }
    
    @Override
    public Collection<String> allClientId() {
        Collection<String> result = new HashSet<>();
        result.addAll(connectionBasedClientManager.allClientId());
        result.addAll(ephemeralIpPortClientManager.allClientId());
        result.addAll(persistentIpPortClientManager.allClientId());
        return result;
    }
    
    @Override
    public boolean isResponsibleClient(Client client) {
        // 判断是否为本机负责的client
        return getClientManagerById(client.getClientId()).isResponsibleClient(client);
    }
    
    @Override
    public boolean verifyClient(DistroClientVerifyInfo verifyData) {
        // 验证客户端信息
        return getClientManagerById(verifyData.getClientId()).verifyClient(verifyData);
    }
    // 判断获取ClientManager
    private ClientManager getClientManagerById(String clientId) {
        if (isConnectionBasedClient(clientId)) {
            return connectionBasedClientManager;
        }
        return clientId.endsWith(ClientConstants.PERSISTENT_SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;
    }
    
    private boolean isConnectionBasedClient(String clientId) {
        return !clientId.contains(IpPortBasedClient.ID_DELIMITER);
    }
}

ClusterRpcClientProxy

ClusterRpcClientProxy是一个服务节点间的Rpc代理类,这里需要注意的是它仅做服务节点间的相互调用。并不是之前讨论的客户端到服务端的请求或者服务端到客户端的请求。但是其本质任然是拿到ip和端口,建立Grpc通道进行相互请求。它继承了MemberChangeListener,而MemberChangeListener继承了Subscriber<MembersChangeEvent>,关注了MembersChangeEvent。也就是会对服务间的节点变动做出响应。下面我们具体来看下这个类的逻辑

@Service
// spring 容器创建
public class ClusterRpcClientProxy extends MemberChangeListener {
    // 默认请求超时时间3s
    private static final long DEFAULT_REQUEST_TIME_OUT = 3000L;
    @Autowired
    ServerMemberManager serverMemberManager;
    @PostConstruct
    // 构造函数后执行
    public void init() {
        try {
            // 向通知中心注册自己,自己作为一个订阅者,订阅在MemberChangeListener中声明的MembersChangeEvent事件
            NotifyCenter.registerSubscriber(this);
            // 获取除去自己外的其他服务节点成员
            List<Member> members = serverMemberManager.allMembersWithoutSelf();
            // 刷新成员
            refresh(members);
            Loggers.CLUSTER
                    .warn("[ClusterRpcClientProxy] success to refresh cluster rpc client on start up,members ={} ",
                            members);
        } catch (NacosException e) {
            Loggers.CLUSTER.warn("[ClusterRpcClientProxy] fail to refresh cluster rpc client,{} ", e.getMessage());
        }
        
    }
    
    private void refresh(List<Member> members) throws NacosException {
        // 确保创建新成员的客户端
        for (Member member : members) {
            if (MemberUtil.isSupportedLongCon(member)) {
                // 创建对应成员的Grpc客户端并启动客户端
                createRpcClientAndStart(member, ConnectionType.GRPC);
            }
        }
        
        // 关闭并删除旧成员。
        Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();
        Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
        List<String> newMemberKeys = members.stream().filter(MemberUtil::isSupportedLongCon)
                .map(this::memberClientKey).collect(Collectors.toList());
        while (iterator.hasNext()) {
            Map.Entry<String, RpcClient> next1 = iterator.next();
            if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
                Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
                RpcClientFactory.getClient(next1.getKey()).shutdown();
                iterator.remove();
            }
        }
        
    }
    
    private String memberClientKey(Member member) {
        return "Cluster-" + member.getAddress();
    }
    
    private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {
        Map<String, String> labels = new HashMap<>(2);
        labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER);
        String memberClientKey = memberClientKey(member);
        // 创建GRpc客户端
        RpcClient client = buildRpcClient(type, labels, memberClientKey);
        if (!client.getConnectionType().equals(type)) {
            Loggers.CLUSTER.info(",connection type changed,destroy client of member - > : {}", member);
            RpcClientFactory.destroyClient(memberClientKey);
            client = buildRpcClient(type, labels, memberClientKey);
        }
        
        if (client.isWaitInitiated()) {
            Loggers.CLUSTER.info("start a new rpc client to member - > : {}", member);
            
            //one fixed server
            client.serverListFactory(new ServerListFactory() {
                @Override
                public String genNextServer() {
                    return member.getAddress();
                }
                
                @Override
                public String getCurrentServer() {
                    return member.getAddress();
                }
                
                @Override
                public List<String> getServerList() {
                    return CollectionUtils.list(member.getAddress());
                }
            });
            // 启动客户端
            client.start();
        }
    }
    
    /**
     * 构造Grpc客户端
     */
    private RpcClient buildRpcClient(ConnectionType type, Map<String, String> labels, String memberClientKey) {
        RpcClient clusterClient = RpcClientFactory
                .createClusterClient(memberClientKey, type, EnvUtil.getAvailableProcessors(2),
                        EnvUtil.getAvailableProcessors(8), labels);
        return clusterClient;
    }
    
    // 忽略发送Rpc请求...
    
    @Override
    public void onEvent(MembersChangeEvent event) {
        try {
            // 获取到服务节点变动,刷新服务端成员节点
            List<Member> members = serverMemberManager.allMembersWithoutSelf();
            refresh(members);
        } catch (NacosException e) {
            Loggers.CLUSTER.warn("[serverlist] fail to refresh cluster rpc client, event:{}, msg: {} ", event, e.getMessage());
        }
    }
    
    /**
     * 检查成员的客户端是否正在运行
     */
    public boolean isRunning(Member member) {
        RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
        if (null == client) {
            return false;
        }
        return client.isRunning();
    }
}

ClusterRpcClientProxy逻辑代码中,只要发现服务节点变化,就需要重新创建各个节点的GRpc连接,并销毁之前的连接。

DistroClientDataProcessor

构造函数中的几个类分析完后,我们接着继续看@PostConstruct中的几个重要类。首先看到的就是创建了DistroClientDataProcessor。先看下这个类图。

13、Nacos 源码分析-Distro协议(上)

从类图来看,它肯定是一个事件订阅者,处理这某一类的事件。

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
    
    public static final String TYPE = "Nacos:Naming:v2:ClientData";
    private final ClientManager clientManager;
    private final DistroProtocol distroProtocol;
    private volatile boolean isFinishInitial;
    
    public DistroClientDataProcessor(ClientManager clientManager, DistroProtocol distroProtocol) {
        this.clientManager = clientManager;
        this.distroProtocol = distroProtocol;
        // 注册自己为订阅者
        NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());
    }
    
    @Override
    public List<Class<? extends Event>> subscribeTypes() {
        List<Class<? extends Event>> result = new LinkedList<>();
        // 订阅了以下3类事件,分别是客户端变动事件,客户端断开事件和客户端验证失败事件
        result.add(ClientEvent.ClientChangedEvent.class);
        result.add(ClientEvent.ClientDisconnectEvent.class);
        result.add(ClientEvent.ClientVerifyFailedEvent.class);
        return result;
    }
    
    @Override
    public void onEvent(Event event) {
        if (EnvUtil.getStandaloneMode()) {
            return;
        }
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            // 同步到验证失败的服务节点上
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
           // 同步到所有服务节点
            syncToAllServer((ClientEvent) event);
        }
    }
    
    private void syncToVerifyFailedServer(ClientEvent.ClientVerifyFailedEvent event) {
        Client client = clientManager.getClient(event.getClientId());
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        // 验证是否应直接同步失败的数据,依然是被包装,再通过rpc请求到对应的服务节点
        distroProtocol.syncToTarget(distroKey, DataOperation.ADD, event.getTargetServer(), 0L);
    }
    
    private void syncToAllServer(ClientEvent event) {
        Client client = event.getClient();
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        // 通过协议同步,逻辑是在内部执行相关的任务,再发送rpc请求
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.DELETE);
        } else if (event instanceof ClientEvent.ClientChangedEvent) {
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.CHANGE);
        }
    }  
    // 省略后边部分代码...
}

DistroTransportAgent

DistroTransportAgent是一个协议传输的代理类,主要负责同步数据用。它是一个接口定义。

public interface DistroTransportAgent {
    /**
     * 是否支持带回调的传输数据
     */
    boolean supportCallbackTransport();
    
    /**
     * 同步数据
     */
    boolean syncData(DistroData data, String targetServer);
    
    /**
     * 使用回调同步数据
     */
    void syncData(DistroData data, String targetServer, DistroCallback callback);
    
    /**
     * 同步验证数据
     */
    boolean syncVerifyData(DistroData verifyData, String targetServer);
    
    /**
     * 使用回调同步验证数据
     */
    void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback);
    
    /**
     * 从目标服务器获取数据
     */
    DistroData getData(DistroKey key, String targetServer);
    
    /**
     * 从目标服务器获取所有快照
     */
    DistroData getDatumSnapshot(String targetServer);
}

其实现类为DistroClientTransportAgent。我们看下实现类中的同步数据方法是怎么实现的

@Override
public boolean syncData(DistroData data, String targetServer) {
    // 目标服务不存在
    if (isNoExistTarget(targetServer)) {
        return true;
    }
    // 构造distro协议数据的请求
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    // 找到对应的ip,端口等属性信息
    Member member = memberManager.find(targetServer);
    // 检查服务节点是否在线
    if (checkTargetServerStatusUnhealthy(member)) {
        Loggers.DISTRO
            .warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,
                  data.getDistroKey());
        return false;
    }
    try {
        // 在线的话,发送grpc的请求
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        // 检查结果是否成功
        return checkResponse(response);
    } catch (NacosException e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e);
    }
    return false;
}

同步数据其实也就是将协议数据发送给其他的节点。

DistroClientTaskFailedHandler

DistroClientTaskFailedHandler比较简单,是对失败的一些任务延迟在进行重试。

public class DistroClientTaskFailedHandler implements DistroFailedTaskHandler {
    private final DistroTaskEngineHolder distroTaskEngineHolder;  
    public DistroClientTaskFailedHandler(DistroTaskEngineHolder distroTaskEngineHolder) {
        this.distroTaskEngineHolder = distroTaskEngineHolder;
    }
    
    @Override
    public void retry(DistroKey distroKey, DataOperation action) {
        // 设置了重试的延迟时间
        DistroDelayTask retryTask = new DistroDelayTask(distroKey, action,
                DistroConfig.getInstance().getSyncRetryDelayMillis());
        // 放入延迟对垒,等待执行
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKey, retryTask);
    }
}

总结

分析到这里,我们可以进行以下总结

  1. 在应用启动的时候会首次加载快照数据,尽快恢复之前的状态
  2. 后台会不断的发送验证数据到各个节点,保证数据一致
  3. 在其通知失败的情况下,会继续进行延迟重试
  4. 维护的服务器集群地址可进行动态更细,并做健康检查,判断服务以及服务实例的上线下线
  5. distro协议类似于尽最大努力通知,其在不断的进行发送同步验证数据来保证数据的一致性

在分析过程中,我们始终是作为一个服务节点的客户端在分析代码,当服务节点的服务端接收到数据怎么处理的呢?只有即分析了客户端,又分析了服务端,这个才算是一个完整的闭环。另外我们开始只分析了DistroProtocol中在构造方法中的startVerifyTaskstartLoadTask方法,其他方法也很重要,但还没分析。这些内容的话留在下一篇继续讲解,敬请期待。