likes
comments
collection
share

Nacos注册中心15-基于raft 协议的数据一致性

作者站长头像
站长
· 阅读数 25

欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

0. 环境

  • nacos版本:1.4.1
  • Spring Cloud : 2020.0.2
  • Spring Boot :2.4.4
  • Spring Cloud alibaba: 2.2.5.RELEASE

测试代码:github.com/hsfxuebao/s…

一文中我们着重介绍了nacos 对于raft协议实现的落地,介绍了它的选举,它的心跳是怎样运转的,其实重要的还是term 与随机时间这块还有获得半数+1票,本文就要看看基于这套raft 协议的数据同步是怎样进行的,主要是写操作。

1. 回顾一下服务注册流程

当一个服务注册请求打到nacos服务器的时候,它会根据根据你这个实例的namespace, serviceName,group信息获取到对应的service,如果不存在就创建,然后就是进行根据你这个service里面原来的实例列表信息在加上这个新的实例信息,整合出来一个新的实例列表信息,接着就是找到一致性服务(consistencyService)进行put操作,这个consistencyService 会根据实例是临时节点(默认就是这个临时节点)还是永久节点选择对应的consistencyService ,其实对于临时节点对应的就是DistroConsistencyServiceImpl,它节点之间数据同步是peer to peer 的,跟那个eureka 差不多,对于永久节点对应的是PersistentConsistencyServiceDelegateImpl,这个里面有2个实现一个是RaftConsistencyServiceImpl的,一个是PersistentServiceProcessorRaftConsistencyServiceImpl 是1.4之前版本使用的,也是我们今天要介绍的,后面这个PersistentServiceProcessor 是1.4包括以上的版本使用的(这里我们先不介绍)。 大概就是下图这个样子:

Nacos注册中心15-基于raft 协议的数据一致性

本文主要就是介绍下红框中的基于raft 协议数据同步的部分。

2. 基于raft的写操作同步

我们可以对照着上图红框部分来看看RaftConsistencyServiceImpl# put:

@Override
public void put(String key, Record value) throws NacosException {
    // 检查有没有停止
    checkIsStopWork();
    try {
        // todo 找到这个
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft put failed.", e);
        throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                e);
    }
}

这里没什么好说的,就是检查下状态,然后找到这个raftCoresignalPublish 来进行存储,同步。接下来看看signalPublish 这个方法:

public void signalPublish(String key, Record value) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    // 不是leader的话
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);

        final RaftPeer leader = getLeader();

        // todo 发送给leader    /raft/datum post请求
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }

   ...
}

比较重要的点,判断,如果本机不是leader节点的话,就要封装参数将请求发送给leader节点,leader 节点收到请求后,其实还是走到这个put方法中。来看下这个raftProxy.proxyPostLarge 方法:

public void proxyPostLarge(String server, String api, String content, Map<String, String> headers)
        throws Exception {
    // do proxy
    if (!IPUtil.containsPort(server)) {
        server = server + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort();
    }
    String url = "http://" + server + EnvUtil.getContextPath() + api;
    
    RestResult<String> result = HttpClient.httpPostLarge(url, headers, content);
    if (!result.ok()) {
        throw new IllegalStateException("leader failed, caused by: " + result.getMessage());
    }
}

可以看到,leader结果不ok的话,会抛出异常。 接着raftCore的signalPublish 方法往下看:

public void signalPublish(String key, Record value) throws Exception {
    ...
    // 如果是leader的话
    OPERATE_LOCK.lock();
    try {
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        // 原先没有
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            // 有的话 timestamp+1
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }

        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        // 数据
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        // 来源
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));

        // todo 本地存储
        onPublish(datum, peers.local());

        // 将内容同步给所有的 FOLLOWER节点
        final String content = json.toString();

        ...
        }
    }
}

这里有2个重要的点,

  • 一个是更新数据里面的timestamp 值,就是自增1,这个timestamp 值很重要,相当于是个版本
  • 接着调用onPublish 方法进行本地存储。我们来看下onPublish方法干了啥:
public void onPublish(Datum datum, RaftPeer source) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    RaftPeer local = peers.local();
    if (datum.value == null) {
        Loggers.RAFT.warn("received empty datum");
        throw new IllegalStateException("received empty datum");
    }

    // 不是leader的话 抛异常
    if (!peers.isLeader(source.ip)) {
        Loggers.RAFT
                .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                        JacksonUtils.toJson(getLeader()));
        throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
    }

    if (source.term.get() < local.term.get()) {
        Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                JacksonUtils.toJson(local));
        throw new IllegalStateException(
                "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
    }

    // 重置 选举时间
    local.resetLeaderDue();

    // if data should be persisted, usually this is true:
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);
    }
    // 存储到内存
    datums.put(datum.key, datum);

    if (isLeader()) {
        // 如果是leader,就将这个本地的term+100
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    // 如果不是leader
    } else {
        // 纠正  本地的term 本地的话加100  大于leader的那个term  就纠正一下本地的
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            // 本地的也加100
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    // 将term值写到文件中
    raftStore.updateTerm(local.term.get());
    // 发送ValueChangeEvent事件
    NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

这个onPublish 很重要,因为follower节点收到同步请求之后,也是调用了这个方法,它主要是干了6件事情,

  • 重置下选举倒计时,毕竟leader还活着,follower节点收到了leader 同步请求也是执行这个方法,也是需要重置倒计时的。

  • 如果是持久节点, 将datum写到文件中。 Nacos注册中心15-基于raft 协议的数据一致性 就是这个样子,里面记录了service详细信息与节点集合信息。

  • 将datum存储到内存中,其实就是put到一个map了。

  • 更新term值,leader的话+100,然后follower +100的话不超过leader就可以,超过了就使用leader的值。

  • 将这个term写到文件中,不然的服务突然挂了,再起来就不知道term是多少了,或者是整个集群挂了,重启的话,不知道以谁的数据为准了,因为选举leader就是看的这个term值大小。

  • 发布服务实例改变事件,发布到通知中心中,然后PersistentNotifier 这个类会订阅这个事件,然后进行通知了,其实这里就是异步通知。

好了,onPublish 方法就介绍完了。 接着raftCore的signalPublish 方法往下看:

public void signalPublish(String key, Record value) throws Exception {
        ...
        // 将内容同步给所有的 FOLLOWER节点
        final String content = json.toString();

        // 只要有半数+1 实例回复ok 就算ok了
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        for (final String server : peers.allServersIncludeMyself()) {
            // 是leader 跳过countdown
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            // /raft/datum/commit
            final String url = buildUrl(server, API_ON_PUB);
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.RAFT
                                .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                        datum.key, server, result.getCode());
                        return;
                    }
                    // 说明这个peer 回复ok了
                    latch.countDown();
                }

                @Override
                public void onError(Throwable throwable) {
                    Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                }

                @Override
                public void onCancel() {

                }
            });

        }

        // 5s后还没有完事 就抛出异常
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            // only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }

        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        OPERATE_LOCK.unlock();
    }
}

这块也是很重要的,这里用了CountDownLatch 来计数堵塞超时,数量就是节点数/2+1(节点半数+1),接着就是遍历这堆节点了,然后发送数据同步请求,然后每收到一个follower节点的回复,CountDownLatch就减1 一下。

可以看到后面一直等着,5秒超时,如果5s之内follower节点回复数量大于等于 半数+1,就算成功了,如果没有的话,就抛出异常,然后释放锁资源。

这里可以仔细想一下,leader节点是先将数据保存到本地了,不管是内存还是文件,term也进行了+1 ,更新事件也发布了,这个时候,如果因为网络故障同步节点成功达不到这个半数+1 ,就会造成数据不一致的情况,客户端将请求打到不同的nacos 节点上,拿到的数据是不一致的,所以从这里可以看出来nacos这个raft 协议并不是强一致,强一致的就要么都成功,要么都失败的,它这个整的失败的时候数据,leader与follower数据发生了不一致。其实这个nacos 的raft 协议实现的是个最终一致。接下来我们看下它最终一致是怎样实现的。

3. 它这个raft协议最终一致是怎样实现的

它这个raft协议最终一致实现原理其实很简单,就是leader在发送心跳的时候会将这个数据的key与timestamp(可以理解成版本号,我们上面也介绍过) 带给follower节点,然后follower节点收到leader 发过来的心跳,会将本地的key ,timestamp 与leader带过来的key,timestamp 进行比较,如果本地少了这个key ,或者是key对应的timestamp 低于leader的话,就会发送请求去leader那拉取不一致的数据:

Nacos注册中心15-基于raft 协议的数据一致性

我们先来看下这个leader发送心跳的时候是怎样将key 与timestamp带过去的。RaftCore#sendBeat方法:

Nacos注册中心15-基于raft 协议的数据一致性

就是将key 与timestamp 封装到了element中。最后发送请求的时候会带过去。 follower 收到心跳信息,比较key与timestamp 的代码和发送拉取数据请求的太长了我们这里就不看了,我们只需要知道有这么个事就可以了,在Raft#receivedBeat 方法中。

参考文章

nacos-1.4.1源码分析(注释) springcloud-source-study学习github地址 深度解析nacos注册中心 mac系统如何安装nacos