likes
comments
collection
share

14、Nacos 源码分析-Distro协议(下)

作者站长头像
站长
· 阅读数 16

在上一篇13、Nacos 源码分析-Distro协议(上)中,我们分析了和Distro协议相关的一些核心类,但是分析的更多是作为发起方的一个处理逻辑,本篇站在Dirstro协议的接收方的角度继续分析接收到之后请求后的处理逻辑。

回忆一下上篇中,我们分析到clusterRpcClientProxy.sendRequest(member, request)后,就未再继续往下继续跟踪了,本次接着这部分代码继续深入,首先我们确定的是这个requestDistroDataRequest类型,然后找到对应requesthandler就是他的处理逻辑。

DistroDataRequest的处理逻辑就是DistroDataRequestHandlerhandle方法中。

@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
    try {
        switch (request.getDataOperation()) {
            case VERIFY:
                // 处理验证请求
                return handleVerify(request.getDistroData(), meta);
            case SNAPSHOT:
                // 处理快照请求
                return handleSnapshot();
            case ADD:
            case CHANGE:
            case DELETE:
                // 处理数据变动的请求
                return handleSyncData(request.getDistroData());
            case QUERY:
                // 处理查询数据的请求
                return handleQueryData(request.getDistroData());
            default:
                // 默认的返回
                return new DistroDataResponse();
        }
    } catch (Exception e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
        DistroDataResponse result = new DistroDataResponse();
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("handle distro request with exception");
        return result;
    }
}

上面的代码可以看出,DistroDataRequest有这多种数据操作类型,根据不同的类型有着不同的处理方式。其处理方式分别有处理验证请求处理快照请求处理数据变动的请求处理查询数据的请求。下面我们一一分析下。

handleVerify

private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
    DistroDataResponse result = new DistroDataResponse();
    // 使用distroProtocol进行验证
    if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
        result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
    }
    return result;
}

代码中直接使用了distroProtocol.onVerify方法。

public boolean onVerify(DistroData distroData, String sourceAddress) {
    if (Loggers.DISTRO.isDebugEnabled()) {
        Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),
                             distroData.getDistroKey());
    }
    String resourceType = distroData.getDistroKey().getResourceType();
    // 寻找对应的处理器执行,这里回想一下DistroClientComponentRegistry#doRegister()方法,在register中注册的处理器,实际就是DistroClientDataProcessor
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
        return false;
    }
    // 通过DistroClientDataProcessor执行验证
    return dataProcessor.processVerifyData(distroData, sourceAddress);
}

既然我们知道了dataProcessor就是DistroClientDataProcessor,那我们接着往下看

public boolean processVerifyData(DistroData distroData, String sourceAddress) {
    // 对数据进行反序列化为DistroClientVerifyInfo对象
    DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class)
        .deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
    // 通过clientManager进行验证
    if (clientManager.verifyClient(verifyData)) {
        return true;
    }
    Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
    return false;
}

clientManager是一个委托类,在Distro协议中,委托的就是EphemeralIpPortClientManager,因为ephemeral默认是true(在1、Nacos 服务注册客户端源码分析中分析过)

@Override
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
    String clientId = verifyData.getClientId();
    IpPortBasedClient client = clients.get(clientId);
    if (null != client) {
        // 旧版本的远程节点将始终使用零修订进行验证
        if (0 == verifyData.getRevision() || client.getRevision() == verifyData.getRevision()) {
            NamingExecuteTaskDispatcher.getInstance()
                .dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
            return true;
        } else {
            Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] IpPortBasedClient[{}] revision local={}, remote={}",
                                client.getClientId(), client.getRevision(), verifyData.getRevision());
        }
    }
    return false;
}

如果版本一致的话,就会执行ClientBeatUpdateTask,看下ClientBeatUpdateTaskrun方法

@Override
public void run() {
    long currentTime = System.currentTimeMillis();
    for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {
        ((HealthCheckInstancePublishInfo) each).setLastHeartBeatTime(currentTime);
    }
    client.setLastUpdatedTime();
}

这个方法就是更新了client的最新的更新时间。

handleSnapshot

private DistroDataResponse handleSnapshot() {
    DistroDataResponse result = new DistroDataResponse();
    // 使用的是distroProtocol的onSnapshot方法
    DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
    result.setDistroData(distroData);
    return result;
}

代码中直接使用了distroProtocol.onSnapshot方法。

public DistroData onSnapshot(String type) {
    // 寻找对应的DistroDataStorage执行,这里回想一下DistroClientComponentRegistry#doRegister()方法,在register中的registerDataStorage,实际就是DistroClientDataProcessor
    DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
    if (null == distroDataStorage) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
        return new DistroData(new DistroKey("snapshot", type), new byte[0]);
    }
    return distroDataStorage.getDatumSnapshot();
}

没想到吧,DistroDataStorage也是DistroClientDataProcessor,继续往下看

public DistroData getDatumSnapshot() {
    List<ClientSyncData> datum = new LinkedList<>();
    for (String each : clientManager.allClientId()) {
        Client client = clientManager.getClient(each);
        if (null == client || !client.isEphemeral()) {
            continue;
        }
        // 获取SyncData
        datum.add(client.generateSyncData());
    }
    ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
    snapshot.setClientSyncDataList(datum);
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot);
    // 返回了clientManager中所有client的ClientSyncData
    return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}

返回了所有实例的ClientSyncData。那generateSyncData()是生成了什么数据呢?

@Override
public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();

    List<String> batchNamespaces = new LinkedList<>();
    List<String> batchGroupNames = new LinkedList<>();
    List<String> batchServiceNames = new LinkedList<>();

    List<InstancePublishInfo> instances = new LinkedList<>();
    List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();
    BatchInstanceData  batchInstanceData = new BatchInstanceData();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        InstancePublishInfo instancePublishInfo = entry.getValue();
        if (instancePublishInfo instanceof BatchInstancePublishInfo) {
            BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;
            batchInstancePublishInfos.add(batchInstance);
            buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);
            batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);
        } else {
            namespaces.add(entry.getKey().getNamespace());
            groupNames.add(entry.getKey().getGroup());
            serviceNames.add(entry.getKey().getName());
            instances.add(entry.getValue());
        }
    }
    ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
    data.getAttributes().addClientAttribute(REVISION, getRevision());
    return data;
}

ClientSyncData就是一个实例的所有的属性信息,包括namespacegroupNameserviceName等信息。

handleSyncData

private DistroDataResponse handleSyncData(DistroData distroData) {
    DistroDataResponse result = new DistroDataResponse();
    // distroProtocol接受distroData
    if (!distroProtocol.onReceive(distroData)) {
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("[DISTRO-FAILED] distro data handle failed");
    }
    return result;
}

还是distroProtocol处理

public boolean onReceive(DistroData distroData) {
    Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
                        distroData.getDistroKey());
    String resourceType = distroData.getDistroKey().getResourceType();
    // 获取到DistroClientDataProcessor
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
        return false;
    }
    return dataProcessor.processData(distroData);
}

依然是调用DistroClientDataProcessorprocessData方法

@Override
public boolean processData(DistroData distroData) {
    switch (distroData.getType()) {
        case ADD:
        case CHANGE:
            // 反序列化ClientSyncData
            ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                .deserialize(distroData.getContent(), ClientSyncData.class);
            // 处理新增和更新的方法
            handlerClientSyncData(clientSyncData);
            return true;
        case DELETE:
            String deleteClientId = distroData.getDistroKey().getResourceKey();
            Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
            // 删除则断开连接
            clientManager.clientDisconnected(deleteClientId);
            return true;
        default:
            return false;
    }
}

在新增的和修改的时候调用了handlerClientSyncData方法。

private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO
        .info("[Client-Add] Received distro client sync data {}, revision={}", clientSyncData.getClientId(),
              clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
    // 同步客户端的连接
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 对client更新
    upgradeClient(client, clientSyncData);
}

syncClientConnected

这个syncClientConnected使用的是EphemeralIpPortClientManagersyncClientConnected

@Override
public boolean clientConnected(String clientId, ClientAttributes attributes) {
    // 创建一个client,并进行连接
    return clientConnected(clientFactory.newClient(clientId, attributes));
}

@Override
public IpPortBasedClient newClient(String clientId, ClientAttributes attributes) {
    long revision = attributes.getClientAttribute(REVISION, 0);
    // 创建了一个基于ip和端口的客户端
    IpPortBasedClient ipPortBasedClient = new IpPortBasedClient(clientId, true, revision);
    ipPortBasedClient.setAttributes(attributes);
    return ipPortBasedClient;
}

@Override
public boolean clientConnected(final Client client) {
    clients.computeIfAbsent(client.getClientId(), s -> {
        Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
        IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
        // 对客户端进行初始化
        ipPortBasedClient.init();
        return ipPortBasedClient;
    });
    return true;
}

public void init() {
    if (ephemeral) {
        beatCheckTask = new ClientBeatCheckTaskV2(this);
        // 定期,每隔5s做一次健康检查
        HealthCheckReactor.scheduleCheck(beatCheckTask);
    } else {
        healthCheckTaskV2 = new HealthCheckTaskV2(this);
        HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
    }
}

在同步客户端连接的时候,后台会执行一个每隔5s的定时任务,对连接的客户端进行健康检查。

upgradeClient

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    Set<Service> syncedService = new HashSet<>();
    // process batch instance sync logic
    processBatchInstanceDistroData(syncedService, client, clientSyncData);
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();

    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            // 处理变动的新增数据
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            // 处理变动的多余数据
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
}

upgradeClient则是对同步的数据clientSyncData进行遍历,找到变动的数据,然后进行处理。当然处理的逻辑都是发布事件,将事件和逻辑区分开。由事件驱动。这里就不再分析各个事件,如果有感兴趣的小伙伴可以自行分析。

handleQueryData

private DistroDataResponse handleQueryData(DistroData distroData) {
    DistroDataResponse result = new DistroDataResponse();
    DistroKey distroKey = distroData.getDistroKey();
    // distroProtocol执行onQuery
    DistroData queryData = distroProtocol.onQuery(distroKey);
    result.setDistroData(queryData);
    return result;
}

看下distroProtocolonQuery

public DistroData onQuery(DistroKey distroKey) {
    String resourceType = distroKey.getResourceType();
    // 这里拿到的DistroClientDataProcessor
    DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType);
    if (null == distroDataStorage) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", resourceType);
        return new DistroData(distroKey, new byte[0]);
    }
    // DistroClientDataProcessor的getDistroData
    return distroDataStorage.getDistroData(distroKey);
}

DistroClientDataProcessor#getDistroData的方法

public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    // 拿到client.generateSyncData()
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

查询的方法比较简单,就是拿到client.generateSyncData(),序列化后组成DistroData返回。

总结

接受到Distro的相关请求后,通过DistroDataRequestHandler处理,而DistroDataRequestHandler又由DistroProtocol中的处理逻辑进行处理的。从而我们分析了上篇没有分析的DistroProtocol中的几个相关方法。

DistroProtocol中又主要由DistroClientDataProcessor进行处理。其主要还是将各个节点的属性的数据存储在DistroData中,然后由各个服务节点之间的缓存进行保存的。所以我们一直称之为Distro是一个临时性的同步协议,它的数据仅仅保存再运行态,当服务关闭了就不存在了。启动的时候拉取最新的数据进行保存,然后通过一系列Rpc的请求和内部各个事件,比如连接注册事件,断开事件等进行事件的驱动,一环扣一环,保证数据的一致性。