likes
comments
collection
share

✨Nacos2✨心跳与健康检查✨

作者站长头像
站长
· 阅读数 34

✨Nacos2✨心跳与健康检查✨


前言

Nacos作为服务注册中心,工作流程示意如下。

✨Nacos2✨心跳与健康检查✨

本文将对上图中的客户端心跳发送服务端的健康检查进行说明与总结。

往期文章如下

正文

一. 客户端心跳发送

Naocs客户端启动时,会创建RpcClient并调用其start() 方法,在该方法中,有一段逻辑就是发送心跳,如下所示(仅看有注释的部分)。

public final void start() throws NacosException {

    boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
    if (!success) {
        return;
    }

    clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
        Thread t = new Thread(r);
        t.setName("com.alibaba.nacos.client.remote.worker");
        t.setDaemon(true);
        return t;
    });

    clientEventExecutor.submit(() -> {
        while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
            ConnectionEvent take;
            try {
                take = eventLinkedBlockingQueue.take();
                if (take.isConnected()) {
                    notifyConnected();
                } else if (take.isDisConnected()) {
                    notifyDisConnected();
                }
            } catch (Throwable e) {

            }
        }
    });

    clientEventExecutor.submit(() -> {
        while (true) {
            try {
                if (isShutdown()) {
                    break;
                }
                ReconnectContext reconnectContext = reconnectionSignal
                        .poll(keepAliveTime, TimeUnit.MILLISECONDS);
                if (reconnectContext == null) {
                    // 这里判断当前时间距离上一次心跳发送时间是否大于等于keepAliveTime
                    // 如果大于等于keepAliveTime则需要向客户端发送心跳
                    // 默认的keepAliveTime是5秒
                    if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                        // 心跳发送
                        boolean isHealthy = healthCheck();
                        if (!isHealthy) {
                            if (currentConnection == null) {
                                continue;
                            }
                            LoggerUtils.printIfInfoEnabled(LOGGER,
                                    "[{}] Server healthy check fail, currentConnection = {}", name,
                                    currentConnection.getConnectionId());

                            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                                break;
                            }

                            boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
                                    .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
                            if (statusFLowSuccess) {
                                reconnectContext = new ReconnectContext(null, false);
                            } else {
                                continue;
                            }

                        } else {
                            lastActiveTimeStamp = System.currentTimeMillis();
                            continue;
                        }
                    } else {
                        continue;
                    }

                }

                if (reconnectContext.serverInfo != null) {
                    boolean serverExist = false;
                    for (String server : getServerListFactory().getServerList()) {
                        ServerInfo serverInfo = resolveServerInfo(server);
                        if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
                            serverExist = true;
                            reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
                            break;
                        }
                    }
                    if (!serverExist) {
                        LoggerUtils.printIfInfoEnabled(LOGGER,
                                "[{}] Recommend server is not in server list, ignore recommend server {}", name,
                                reconnectContext.serverInfo.getAddress());

                        reconnectContext.serverInfo = null;

                    }
                }
                reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
            } catch (Throwable throwable) {

            }
        }
    });

    Connection connectToServer = null;
    rpcClientStatus.set(RpcClientStatus.STARTING);

    int startUpRetryTimes = RETRY_TIMES;
    while (startUpRetryTimes > 0 && connectToServer == null) {
        try {
            startUpRetryTimes--;
            ServerInfo serverInfo = nextRpcServer();

            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name,
                    serverInfo);

            connectToServer = connectToServer(serverInfo);
        } catch (Throwable e) {
            LoggerUtils.printIfWarnEnabled(LOGGER,
                    "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
                    name, e.getMessage(), startUpRetryTimes);
        }

    }

    if (connectToServer != null) {
        LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
                name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
        this.currentConnection = connectToServer;
        rpcClientStatus.set(RpcClientStatus.RUNNING);
        eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
    } else {
        switchServerAsync();
    }

    registerServerRequestHandler(new ConnectResetRequestHandler());

    registerServerRequestHandler(request -> {
        if (request instanceof ClientDetectionRequest) {
            return new ClientDetectionResponse();
        }

        return null;
    });

}

代码很长,重点关注客户端心跳发送部分。通过上述方法,可以知道Nacos客户端会每5秒向服务端发送一次心跳,对应的方法在RpcClienthealthCheck() 方法中,继续跟进看下。

private boolean healthCheck() {
    // 通过gprc请求的方式向服务端发送心跳
    HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
    if (this.currentConnection == null) {
        return false;
    }
    try {
        Response response = this.currentConnection.request(healthCheckRequest, 3000L);
        return response != null && response.isSuccess();
    } catch (NacosException e) {

    }
    return false;
}

那么小结一下Nacos客户端的心跳发送。

  1. gprc的方式发送心跳
  2. 默认每5秒发送一次

二. 服务端健康检查

Nacos的临时实例使用grpc与服务端进行通信,在服务端有一个管理grpc长连接的类叫做ConnectionManager,在服务端启动时ConnectionManager会创建一个周期为3秒的定时任务来处理健康检查,主要就是识别通信间断的长连接得到不健康的客户端并进行探测,探测不过则移除客户端的相关信息,下面看一下这部分逻辑。

代码有点长,主要还是看注释,真想知道这代码要是用公司的Sonar来扫描,评分会有多少

@PostConstruct
public void start() {

    // 开启不健康连接的驱逐定时任务
    RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {

                int totalCount = connections.size();
                Loggers.REMOTE_DIGEST.info("Connection check task start");
                MetricsMonitor.getLongConnectionMonitor().set(totalCount);
                Set<Map.Entry<String, Connection>> entries = connections.entrySet();
                int currentSdkClientCount = currentSdkClientCount();
                boolean isLoaderClient = loadClient >= 0;
                int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;
                int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);

                Loggers.REMOTE_DIGEST
                        .info("Total count ={}, sdkCount={},clusterCount={}, currentLimit={}, toExpelCount={}",
                                totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount),
                                currentMaxClient + (isLoaderClient ? "(loaderCount)" : ""), expelCount);

                List<String> expelClient = new LinkedList<>();

                Map<String, AtomicInteger> expelForIp = new HashMap<>(16);

                for (Map.Entry<String, Connection> entry : entries) {

                    Connection client = entry.getValue();
                    String appName = client.getMetaInfo().getAppName();
                    String clientIp = client.getMetaInfo().getClientIp();
                    if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) {
                        int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp);
                        if (countLimitOfIp < 0) {
                            int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName);
                            countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp;
                        }
                        if (countLimitOfIp < 0) {
                            countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault();
                        }

                        if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) {
                            AtomicInteger currentCountIp = connectionForClientIp.get(clientIp);
                            if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) {
                                expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp));
                            }
                        }
                    }
                }

                Loggers.REMOTE_DIGEST
                        .info("Check over limit for ip limit rule, over limit ip count={}", expelForIp.size());

                if (expelForIp.size() > 0) {
                    Loggers.REMOTE_DIGEST.info("Over limit ip expel info, {}", expelForIp);
                }

                Set<String> outDatedConnections = new HashSet<>();
                long now = System.currentTimeMillis();
                // 这里得到不健康的实例集合
                // Connection里面包含着客户端信息
                // 所以Connection可以等价于客户端
                for (Map.Entry<String, Connection> entry : entries) {
                    Connection client = entry.getValue();
                    String clientIp = client.getMetaInfo().getClientIp();
                    AtomicInteger integer = expelForIp.get(clientIp);
                    if (integer != null && integer.intValue() > 0) {
                        integer.decrementAndGet();
                        expelClient.add(client.getMetaInfo().getConnectionId());
                        expelCount--;
                    } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
                        // 如果这个连接与服务端最近一次通信时间间隔了20秒及以上
                        // 则判定这个连接过期了
                        // 也就是只要客户端20秒或者更久的没和服务端通信
                        // 这个客户端就被识别为不健康的客户端
                        // 从而服务端就要去健康探测这个客户端了
                        outDatedConnections.add(client.getMetaInfo().getConnectionId());
                    }

                }

                if (expelCount > 0) {
                    for (Map.Entry<String, Connection> entry : entries) {
                        Connection client = entry.getValue();
                        if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo()
                                .isSdkSource() && expelCount > 0) {
                            expelClient.add(client.getMetaInfo().getConnectionId());
                            expelCount--;
                            outDatedConnections.remove(client.getMetaInfo().getConnectionId());
                        }
                    }
                }

                String serverIp = null;
                String serverPort = null;
                if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
                    String[] split = redirectAddress.split(Constants.COLON);
                    serverIp = split[0];
                    serverPort = split[1];
                }

                for (String expelledClientId : expelClient) {
                    try {
                        Connection connection = getConnection(expelledClientId);
                        if (connection != null) {
                            ConnectResetRequest connectResetRequest = new ConnectResetRequest();
                            connectResetRequest.setServerIp(serverIp);
                            connectResetRequest.setServerPort(serverPort);
                            connection.asyncRequest(connectResetRequest, null);
                            Loggers.REMOTE_DIGEST
                                    .info("Send connection reset request , connection id = {},recommendServerIp={}, recommendServerPort={}",
                                            expelledClientId, connectResetRequest.getServerIp(),
                                            connectResetRequest.getServerPort());
                        }

                    } catch (ConnectionAlreadyClosedException e) {
                        unregister(expelledClientId);
                    } catch (Exception e) {
                        Loggers.REMOTE_DIGEST.error("Error occurs when expel connection, expelledClientId:{}", expelledClientId, e);
                    }
                }

                // 对不健康的客户端进行健康探测
                Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
                if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                    Set<String> successConnections = new HashSet<>();
                    final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                    for (String outDateConnectionId : outDatedConnections) {
                        try {
                            Connection connection = getConnection(outDateConnectionId);
                            if (connection != null) {
                                ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                                connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                    @Override
                                    public Executor getExecutor() {
                                        return null;
                                    }

                                    @Override
                                    public long getTimeout() {
                                        // 客户端需要在1秒以内响应
                                        return 1000L;
                                    }

                                    @Override
                                    public void onResponse(Response response) {
                                        latch.countDown();
                                        if (response != null && response.isSuccess()) {
                                            // 重新设置连接的最近一次通信时间
                                            connection.freshActiveTime();
                                            // 连接加入通过健康探测的集合
                                            successConnections.add(outDateConnectionId);
                                        }
                                    }

                                    @Override
                                    public void onException(Throwable e) {
                                        latch.countDown();
                                    }
                                });

                                Loggers.REMOTE_DIGEST
                                        .info("[{}]send connection active request ", outDateConnectionId);
                            } else {
                                latch.countDown();
                            }

                        } catch (ConnectionAlreadyClosedException e) {
                            latch.countDown();
                        } catch (Exception e) {
                            Loggers.REMOTE_DIGEST
                                    .error("[{}]Error occurs when check client active detection ,error={}",
                                            outDateConnectionId, e);
                            latch.countDown();
                        }
                    }

                    latch.await(3000L, TimeUnit.MILLISECONDS);
                    Loggers.REMOTE_DIGEST
                            .info("Out dated connection check successCount={}", successConnections.size());

                    for (String outDateConnectionId : outDatedConnections) {
                        if (!successConnections.contains(outDateConnectionId)) {
                            Loggers.REMOTE_DIGEST
                                    .info("[{}]Unregister Out dated connection....", outDateConnectionId);
                            // 探测不过的客户端执行注销逻辑
                            unregister(outDateConnectionId);
                        }
                    }
                }

                if (isLoaderClient) {
                    loadClient = -1;
                    redirectAddress = null;
                }

                Loggers.REMOTE_DIGEST.info("Connection check task end");

            } catch (Throwable e) {
                Loggers.REMOTE.error("Error occurs during connection check... ", e);
            }
        }
    }, 1000L, 3000L, TimeUnit.MILLISECONDS);

}

上述方法就是Nacos服务端 每3秒 对所有通信间断达到 20秒及以上临时客户端 发起ClientDetectionRequest健康探测请求,如果客户端在 1秒 内响应则探测通过,否则探测不通过,探测不通过的客户端,会在服务端这边被执行unregister() 注销逻辑。

继续看一下unregister() 注销逻辑会做哪些事情。

public synchronized void unregister(String connectionId) {
    // 移除长连接
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        // 关闭连接
        remove.close();
        Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
        // 最终会发布ClientDisconnectEvent事件
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

unregister() 注销逻辑会先从ConnectionManager中移除这个长连接,然后对应长连接关闭,最后最重要的就是会发布ClientDisconnectEvent事件,而该事件有两个监听者,一个是ClientServiceIndexesManager,另一个是DistroClientDataProcessor

ClientServiceIndexesManager监听到事件后,会执行下面逻辑。

private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
    Client client = event.getClient();
    for (Service each : client.getAllSubscribeService()) {
        // 移除这个客户端和订阅的服务的映射关系
        removeSubscriberIndexes(each, client.getClientId());
    }
    for (Service each : client.getAllPublishedService()) {
        // 移除这个客户端和服务的映射关系
        // 这里还会触发ServiceChangedEvent事件
        // 将该服务端的实例变更信息通知给订阅者
        removePublisherIndexes(each, client.getClientId());
    }
}

DistroClientDataProcessor监听到事件后,会执行下面逻辑。

private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        // 将客户端移除同步给其它服务端
        // 主要是维持Distro协议
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

小结一下。Nacos的服务端的健康检查机制如下。

  1. 3秒对通信间断20秒及以上的客户端发起健康探测
  2. 探测不过的客户端就会被注销
  3. 服务端这边会移除这个客户端和订阅的服务的映射关系
  4. 服务端会移除这个客户端和服务的映射关系
  5. 服务端会将实例所属服务的变更信息通知给其订阅者
  6. 服务端会将实例的变更信息同步给其它服务端以维持Distro协议

总结

请直接看第一节和第二节的小结,这里不再重复总结了

注意,本文讨论的是临时实例


总结不易,如果本文对你有帮助,烦请点赞,收藏加关注,谢谢帅气漂亮的你。

✨Nacos2✨心跳与健康检查✨

转载自:https://juejin.cn/post/7380771735681171475
评论
请登录