likes
comments
collection
share

Nacos注册中心源码解析(二):AP架构集群

作者站长头像
站长
· 阅读数 29

前言

接上文:Nacos注册中心源码解析(一):服务注册 回到com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put方法: Nacos注册中心源码解析(二):AP架构集群 之前我们看过了onPut()方法,主要是通过异步的方式将实例放入注册表中,接下来我们要看一下在集群的环境下,nacos的数据是怎么同步的。

源码解析

增量同步

distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
        globalConfig.getTaskDispatchPeriod() / 2);

点进去sync方法:

public void sync(DistroKey distroKey, DataOperation action, long delay) {
    // 获得到除了当前机器以外的其他nacos节点
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
        }
    }
}

从配置的集群列表里面找到不包含自己的IP,循环,把每个IP封装成一个DistroKey对象,这个对象里面封装了要同步的机器的IP地址;再封装到DistroDelayTask对象里面,最后调用distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);方法同步数据。点进去addTask()方法:

protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    lock.lock();
    try {
        AbstractDelayTask existTask = tasks.get(key);
        if (null != existTask) {
            newTask.merge(existTask);
        }
        // 放在一个map里面就结束了??--> 后面还有一个定时任务从map里面拿出来做处理的
        tasks.put(key, newTask);
    } finally {
        lock.unlock();
    }
}

addTask()方法很简单,就是把这个DistroDelayTask对象放到一个ConcurrentHashMap里面就结束了。咦?似曾相识啊,难道又要异步处理了??? 是的,在NacosDelayTaskExecuteEngine类的构造器里面有这么一段代码:

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

最主要的是在这里创建了一个定时线程池processingExecutor,并且每100毫秒会执行一次ProcessRunnable任务。(注意:此时主线程已经结束了,也就是服务注册的请求走到这一步已经请求结束了,nacos客户端已经收到了服务器的响应了)。下面我们看看ProcessRunnable任务是怎么做后续的处理的,点进去看看run()方法:

@Override
public void run() {
    try {
        processTasks();
    } catch (Throwable e) {
        getEngineLog().error(e.toString(), e);
    }
}
protected void processTasks() {
    // 从map里面拿到所有任务的key
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        // 通过key拿到一个processor:DistroDelayTaskProcessor
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

从tasks里面获取到所有的taskKey,使用策略模式获取到key对应的processor,调用process()方法进行数据同步,如果返回false(也就是同步失败)则进行重试,我们先看一下重试的逻辑retryFailedTask(),跳过套娃的代码,会一直调用到到addTask()方法。还记得这个方法吗,往上翻翻,这个方法是把DistroDelayTask对象放到tasks里面等待异步处理的,还有印象吧。好,这是重试的逻辑,说白了,就是重新入队等待下次执行呗~。

下面就真正看一下process()方法里面做了哪些事情,其实现类是DistroDelayTaskProcessor,这个类是在spring初始化DistroTaskEngineHolder这个bean的时候设置进来的:

@Component
public class DistroTaskEngineHolder {
    private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();

    // 构造器注入
    public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
        // 在这里设置了默认的processor
        DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
        delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
    }
}

process()方法:

@Override
public boolean process(NacosTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
        DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
        // 暂存到阻塞队列里面
        distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
        return true;
    }
    return false;
}

咦,这里又调用了一次addTask()方法,难道又要做异步了?点进来看一下:

@Override
public void addTask(Object tag, AbstractExecuteTask task) {
    NacosTaskProcessor processor = getProcessor(tag);
    if (null != processor) {
        processor.process(task);
        return;
    }
    // 获取到一个任务的工作者,用来处理DistroSyncChangeTask
    TaskExecuteWorker worker = getWorker(tag);
    worker.process(task);
}

这里又获取了一次NacosTaskProcessor,返回的是null,最后会获取一个任务执行器TaskExecuteWorker,执行process()方法,先不看process方法,先看一下这个类的其他前置逻辑:

public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {

    private final BlockingQueue<Runnable> queue;
    
    public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
        this.name = name + "_" + mod + "%" + total;
        this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
        this.closed = new AtomicBoolean(false);
        this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
        // 这里创建了一个线程在不停地从queue里面拿任务进行处理
        new InnerWorker(name).start();
    }
}

这个类中有很重要的两个信息,一个是阻塞队列BlockingQueue<Runnable> queue,另外一个是new InnerWorker(name).start();。阻塞队列不需要解释,他就是个阻塞队列,里面放的是Runnable;我们看看InnerWorker是个什么东西:

private class InnerWorker extends Thread {

    @Override
    public void run() {
        while (!closed.get()) {
            try {
                Runnable task = queue.take();
                long begin = System.currentTimeMillis();
                task.run();
                long duration = System.currentTimeMillis() - begin;
                if (duration > 1000L) {
                    log.warn("distro task {} takes {}ms", task, duration);
                }
            } catch (Throwable e) {
                log.error("[DISTRO-FAILED] " + e.toString(), e);
            }
        }
    }
}

继承了Thread,是个线程,在run()方法里不停地从阻塞队列里面拿任务进行处理。好,此时此刻,process()方法虽然还没看,但是我相信用膝盖都能想的到他要干什么,肯定是往queue里面放任务了。不信你看看:

@Override
public boolean process(NacosTask task) {
    if (task instanceof AbstractExecuteTask) {
        putTask((Runnable) task);
    }
    return true;
}

private void putTask(Runnable task) {
    try {
        queue.put(task);
    } catch (InterruptedException ire) {
        log.error(ire.toString(), ire);
    }
}

是不是感觉有点乱呢?好,暂停一下,我给你捋捋:这里其实是涉及到了两次异步:

  • 第一次是在distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);的时候。把要同步的数据的key和目标机器的IP封装到DistroDelayTask对象里,异步把这个对象放到ConcurrentHashMap对象里面;主线程结束。
  • 第二次是有个线程对象ProcessRunnable从ConCurrentHashMap中拿到key和其对应的NacosTaskProcessor,最终又创建出来一个DistroSyncChangeTask对象,把他放到了BlockingQueue里面。

好,这次思路理顺了吧,那就继续往下走,放到阻塞队列里面之后呢?是要直接处理了吗还是又要做异步了呢?还记得上面那个InnerWorker对象吧,再把他的run()方法贴出看看:

@Override
public void run() {
    while (!closed.get()) {
        try {
            // 从队列里面拿出来DistroSyncChangeTask对象
            Runnable task = queue.take();
            long begin = System.currentTimeMillis();
            // 执行run()方法
            task.run();
            long duration = System.currentTimeMillis() - begin;
            if (duration > 1000L) {
                log.warn("distro task {} takes {}ms", task, duration);
            }
        } catch (Throwable e) {
            log.error("[DISTRO-FAILED] " + e.toString(), e);
        }
    }
}

这里主要就是回调DistroSyncChangeTask对象的run()方法,这里就不放全部代码了,只有一行代码是我们需要关心的:

public void run() {
    // 开始请求其他节点的 [POST /distro/datum]接口进行数据同步
    boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
    if (!result) {
        handleFailedTask();
    }
}

ayncData()方法里面调用了NamingProxy.syncData(),看看NamingProxy.syncData()方法: Nacos注册中心源码解析(二):AP架构集群 很明显,在这里以POST的方式调用了/distro/datum接口进行数据同步。下面我们看看这个接口是怎么进行数据同步的。

服务端数据同步接口[PUT /distro/datum] Nacos注册中心源码解析(二):AP架构集群 做一些基本的逻辑校验(是否是临时实例、serviceMap时候存在)之后,把数据封装到DistroHttpData对象里面,调用onReceive()方法,最终会调用到DistroConsistencyServiceImpl#processData()方法: Nacos注册中心源码解析(二):AP架构集群 Nacos注册中心源码解析(二):AP架构集群processData()中有没有注意到一个onPut()方法,这个方法还有印象吗?在 Nacos注册中心源码解析(一):服务注册 这篇文章中我们有提过这个方法,是用来异步更新本地注册表的。这个方法既可以用来处理客户端的注册请求也可以处理其他服务的数据同步。

全量同步

上面提到的是Nacos服务器在运行的情况下进行增量的数据同步的过程。那如果我其中某一台实例挂掉了,又重新启动了,则在启动的时候会做一次全量数据拉取。

有一个很重要的类:DistroProtocol

@Component
public class DistroProtocol {

    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
            DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        this.distroConfig = distroConfig;
        // 启动一个distro任务,这是全量同步的入口
        startDistroTask();
    }

    private void startDistroTask() {
        if (EnvUtil.getStandaloneMode()) {
            isInitialized = true;
            return;
        }
        startVerifyTask();
        // TODO 启动一个定时任务从其他nacos节点上全量获取数据
        startLoadTask();
    }

    private void startLoadTask() {
        DistroCallback loadCallback = new DistroCallback() {
            @Override
            public void onSuccess() {
                isInitialized = true;
            }

            @Override
            public void onFailed(Throwable throwable) {
                isInitialized = false;
            }
        };
        // 启动一个定时任务开始全量同步数据
        GlobalExecutor.submitLoadDataTask(
                new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
    }

}

这个类是由spring管理的bean,这个bean在被创建的同时在其内部绑定了一个定时任务DistroLoadDataTask,找到这个类的run()方法: Nacos注册中心源码解析(二):AP架构集群run()方法里面调用了load()方法: Nacos注册中心源码解析(二):AP架构集群 全量同步的核心逻辑都在loadAllDataSnapshotFromRemote()方法里面,从名字就可以看出来,这是从远端全量加载所有的快照数据的,下面点进去看看他是怎么加载的,其实我们不妨大胆猜一下,在增量更新的时候是通过请求其他节点的接口实现的,而现在全量拉取的时候是不是也要请求其他节点的接口呢: Nacos注册中心源码解析(二):AP架构集群 可以看到,从配置文件对象中获取到除了自己以外的其他的节点的地址,循环调用了transportAgent.getDatumSnapshot(each.getAddress())dataProcessor.processSnapshot(distroData)方法,先看看transportAgent.getDatumSnapshot方法: Nacos注册中心源码解析(二):AP架构集群 Nacos注册中心源码解析(二):AP架构集群 这里请求的地址就是: [GET http://ip:port/nacos/v1/ns/distro/datums],那我们找到这个接口看看呗,全局搜索"datums"Nacos注册中心源码解析(二):AP架构集群 Nacos注册中心源码解析(二):AP架构集群 Nacos注册中心源码解析(二):AP架构集群 在这里把DataStore的所有数据返回回去。再看看dataProcessor.processSnapshot(distroData)这个方法会更新本地的注册表;全量拉取结束。

定时同步

不论是增量同步还是全量同步,都是使用定时任务的方式去实现的,在实际的生产环境当中会面临很多的场景,比如实例有更新,实例被强制下线,新增加了一个服务等等各种情况导致多个nacos节点之间数据的不一致,针对这种情况肯定要有一个补偿的方案。在DistroProtocol这个类中刚才看了全量同步的过程,在全量同步之前还有一个定时任务DistroVerifyTaskNacos注册中心源码解析(二):AP架构集群 Nacos注册中心源码解析(二):AP架构集群 点进去到DistroVerifyTask类的run()方法看看: Nacos注册中心源码解析(二):AP架构集群 Nacos注册中心源码解析(二):AP架构集群 从本地的DataStore里面拿到所有的数据key,循环请求其他节点的 [PUT /distro/checksum] 接口,找到这个接口看一下: Nacos注册中心源码解析(二):AP架构集群 Nacos注册中心源码解析(二):AP架构集群 Nacos注册中心源码解析(二):AP架构集群

public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
    // 存放要更新(增删改)的数据Key
    List<String> toUpdateKeys = new ArrayList<>();
    // 存放要删除的数据Key
    List<String> toRemoveKeys = new ArrayList<>();
    for (Map.Entry<String, String> entry : checksumMap.entrySet()) {

        // 如果当前机器没有其他机器的数据key,则要放到toUpdateKeys集合里等待更新
        if (!dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).value == null || !dataStore
                .get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
            toUpdateKeys.add(entry.getKey());
        }
    }

    for (String key : dataStore.keys()) {

        // 如果其他节点的集合里面没有我这个数据Key,则要放到toRemoveKeys集合里面等待删除
        if (!checksumMap.containsKey(key)) {
            toRemoveKeys.add(key);
        }
    }

    // 删除多余出来的数据Key
    for (String key : toRemoveKeys) {
        onRemove(key);
    }

    DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
            server);
    distroKey.getActualResourceTypes().addAll(toUpdateKeys);
    // 通过数据Key从其他机器上获取对应的数据
    DistroData remoteData = distroProtocol.queryFromRemote(distroKey);
    if (null != remoteData) {
        // 把数据存到本地的DataStore
        processData(remoteData.getContent());
    }
}

核心的逻辑都在上面的onReceiveChecksums()方法里面,主要逻辑是用其他服务器传来的数据Key和本地的数据Key对比,如果本地多出来了就删掉,如果本地有增删改的场景则请求对向服务器的 [GET /distro/datum] 接口,通过Key获取要更新的数据。是不是又感觉有点绕了?别急,我给你画个流程图捋捋: Nacos注册中心源码解析(二):AP架构集群

心跳处理

在集群的模式下,每个节点都会存储相同的数据,比如有一个uams的服务在节点1和节点2都有存储,那么对于心跳检测来说是要在两个节点都去检测吗,如果真要这么做了那这个心跳的检测必然会出现问题,因为每个节点的启动的时间不一样,心跳的定时任务执行的时机也不一样。那么对于这种情况,看看在nacos中是怎么做的。找到心跳检测的那个定时任务:ClientBeatCheckTask,看他的run()方法: Nacos注册中心源码解析(二):AP架构集群 Nacos注册中心源码解析(二):AP架构集群DistroMapper#responsible()方法里对心跳检测做了一个限制,每次只能有一个机器执行,不会出现多个机器同时执行的情况。

总结

在AP架构下,nacos的数据同步主要还是依赖异步队列+定时任务,多多少少会有些延迟和短暂的数据不一致的问题,所以还会搭配补偿重试机制。但是优点就是在并发很高的情况下仍然能正常的对外提供服务并且性能还不错。这也是使用nacos集群很常用的一种集群模式。