Nacos注册中心15-基于raft 协议的数据一致性
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
0. 环境
- nacos版本:1.4.1
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4
- Spring Cloud alibaba: 2.2.5.RELEASE
在一文中我们着重介绍了nacos 对于raft协议实现的落地,介绍了它的选举,它的心跳是怎样运转的,其实重要的还是term 与随机时间这块还有获得半数+1票
,本文就要看看基于这套raft 协议的数据同步是怎样进行的,主要是写操作。
1. 回顾一下服务注册流程
当一个服务注册请求打到nacos服务器的时候,它会根据根据你这个实例的namespace, serviceName,group
信息获取到对应的service
,如果不存在就创建,然后就是进行根据你这个service里面原来的实例列表信息在加上这个新的实例信息,整合出来一个新的实例列表信息,接着就是找到一致性服务(consistencyService
)进行put
操作,这个consistencyService
会根据实例是临时节点(默认就是这个临时节点)还是永久节点选择对应的consistencyService
,其实对于临时节点对应的就是DistroConsistencyServiceImpl
,它节点之间数据同步是peer to peer
的,跟那个eureka
差不多,对于永久节点对应的是PersistentConsistencyServiceDelegateImp
l,这个里面有2个实现一个是RaftConsistencyServiceImpl
的,一个是PersistentServiceProcessor
,RaftConsistencyServiceImpl
是1.4之前版本使用的,也是我们今天要介绍的,后面这个PersistentServiceProcessor
是1.4包括以上的版本使用的(这里我们先不介绍)。
大概就是下图这个样子:
本文主要就是介绍下红框中的基于raft 协议数据同步的部分。
2. 基于raft的写操作同步
我们可以对照着上图红框部分来看看RaftConsistencyServiceImpl# put
:
@Override
public void put(String key, Record value) throws NacosException {
// 检查有没有停止
checkIsStopWork();
try {
// todo 找到这个
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}
这里没什么好说的,就是检查下状态,然后找到这个raftCore
的signalPublish
来进行存储,同步。接下来看看signalPublish
这个方法:
public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
// 不是leader的话
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
// todo 发送给leader /raft/datum post请求
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
...
}
比较重要的点,判断,如果本机不是leader节点的话,就要封装参数将请求发送给leader节点,leader 节点收到请求后,其实还是走到这个put方法中。来看下这个raftProxy.proxyPostLarge
方法:
public void proxyPostLarge(String server, String api, String content, Map<String, String> headers)
throws Exception {
// do proxy
if (!IPUtil.containsPort(server)) {
server = server + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort();
}
String url = "http://" + server + EnvUtil.getContextPath() + api;
RestResult<String> result = HttpClient.httpPostLarge(url, headers, content);
if (!result.ok()) {
throw new IllegalStateException("leader failed, caused by: " + result.getMessage());
}
}
可以看到,leader结果不ok的话,会抛出异常。
接着raftCore的signalPublish
方法往下看:
public void signalPublish(String key, Record value) throws Exception {
...
// 如果是leader的话
OPERATE_LOCK.lock();
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
// 原先没有
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
// 有的话 timestamp+1
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
// 数据
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
// 来源
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
// todo 本地存储
onPublish(datum, peers.local());
// 将内容同步给所有的 FOLLOWER节点
final String content = json.toString();
...
}
}
}
这里有2个重要的点,
- 一个是更新数据里面的
timestamp 值,就是自增1
,这个timestamp 值很重要,相当于是个版本
。 - 接着调用
onPublish 方法进行本地存储
。我们来看下onPublish方法干了啥:
public void onPublish(Datum datum, RaftPeer source) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
// 不是leader的话 抛异常
if (!peers.isLeader(source.ip)) {
Loggers.RAFT
.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(local));
throw new IllegalStateException(
"out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
// 重置 选举时间
local.resetLeaderDue();
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);
}
// 存储到内存
datums.put(datum.key, datum);
if (isLeader()) {
// 如果是leader,就将这个本地的term+100
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
// 如果不是leader
} else {
// 纠正 本地的term 本地的话加100 大于leader的那个term 就纠正一下本地的
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
// 本地的也加100
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
// 将term值写到文件中
raftStore.updateTerm(local.term.get());
// 发送ValueChangeEvent事件
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
这个onPublish
很重要,因为follower
节点收到同步请求之后,也是调用了这个方法,它主要是干了6件事情,
-
重置下选举倒计时,毕竟leader还活着,follower节点收到了leader 同步请求也是执行这个方法,也是需要重置倒计时的。
-
如果是持久节点, 将datum写到文件中。
就是这个样子,里面记录了service详细信息与节点集合信息。
-
将datum存储到内存中,其实就是put到一个map了。
-
更新term值,leader的话+100,然后follower +100的话不超过leader就可以,超过了就使用leader的值。
-
将这个term写到文件中,不然的服务突然挂了,再起来就不知道term是多少了,或者是整个集群挂了,重启的话,不知道以谁的数据为准了,因为选举leader就是看的这个term值大小。
-
发布服务实例改变事件,发布到通知中心中,然后PersistentNotifier 这个类会订阅这个事件,然后进行通知了,其实这里就是异步通知。
好了,onPublish 方法就介绍完了。
接着raftCore的signalPublish
方法往下看:
public void signalPublish(String key, Record value) throws Exception {
...
// 将内容同步给所有的 FOLLOWER节点
final String content = json.toString();
// 只要有半数+1 实例回复ok 就算ok了
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {
// 是leader 跳过countdown
if (isLeader(server)) {
latch.countDown();
continue;
}
// /raft/datum/commit
final String url = buildUrl(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, result.getCode());
return;
}
// 说明这个peer 回复ok了
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
}
@Override
public void onCancel() {
}
});
}
// 5s后还没有完事 就抛出异常
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
这块也是很重要的,这里用了CountDownLatc
h 来计数堵塞超时,数量就是节点数/2+1(节点半数+1)
,接着就是遍历这堆节点了,然后发送数据同步请求,然后每收到一个follower节点的回复,CountDownLatch就减1 一下。
可以看到后面一直等着,5秒超时,如果5s之内follower节点回复数量大于等于 半数+1
,就算成功了,如果没有的话,就抛出异常,然后释放锁资源。
这里可以仔细想一下,leader节点是先将数据保存到本地了,不管是内存还是文件,term也进行了+1 ,更新事件也发布了,这个时候,如果因为网络故障同步节点成功达不到这个半数+1 ,就会造成数据不一致的情况,客户端将请求打到不同的nacos 节点上,拿到的数据是不一致的,所以从这里可以看出来nacos这个raft 协议并不是强一致,强一致的就要么都成功,要么都失败的,它这个整的失败的时候数据,leader与follower数据发生了不一致。其实这个nacos 的raft 协议实现的是个最终一致。接下来我们看下它最终一致是怎样实现的。
3. 它这个raft协议最终一致是怎样实现的
它这个raft协议最终一致实现原理其实很简单,就是leader在发送心跳的时候会将这个数据的key与timestamp(可以理解成版本号,我们上面也介绍过) 带给follower节点
,然后follower节点收到leader 发过来的心跳,会将本地的key ,timestamp 与leader带过来的key,timestamp 进行比较,如果本地少了这个key ,或者是key对应的timestamp 低于leader的话,就会发送请求去leader那拉取不一致的数据:
我们先来看下这个leader发送心跳的时候是怎样将key 与timestamp带过去的。RaftCore#sendBeat
方法:
就是将key 与timestamp 封装到了element中。最后发送请求的时候会带过去。
follower 收到心跳信息,比较key与timestamp 的代码和发送拉取数据请求的太长了我们这里就不看了,我们只需要知道有这么个事就可以了,在Raft#receivedBeat
方法中。
参考文章
nacos-1.4.1源码分析(注释) springcloud-source-study学习github地址 深度解析nacos注册中心 mac系统如何安装nacos
转载自:https://juejin.cn/post/7165292984844419109