ZooKeeper源码(二)崩溃恢复
前言
本文基于zk3.5.8源码,分析zk的崩溃恢复流程。
主要包括选主、恢复、心跳三个部分。
没耐心的朋友可以跳过分析部分,直接看总结。
QuorumPeer
ZK中的每个节点,都会启动一个QuorumPeer,QuorumPeer线程根据当前节点状态,决定以何种角色启动。
当节点处于崩溃恢复阶段(比如刚启动),节点都是LOOKING状态,需要找到Leader;当找到Leader后,进入原子广播阶段,变为LEADING或FOLLOWING状态。(忽略OBSERVING)
// QuorumPeer.java
private ServerState state = ServerState.LOOKING;
public enum ServerState {
LOOKING, FOLLOWING, LEADING, OBSERVING;
}
当处于原子广播阶段,QuorumPeer根据状态会new出来Leader/Follower/Observer实例;当崩溃后,这些实例又会被销毁(shutdown并指向null),重新进入LOOKING状态。
// QuorumPeer.java
public Follower follower;
public Leader leader;
public Observer observer;
集群中每个节点都对应一个id,即myid,这个id来源于dataDir/myid文件。
如果当前节点myid=1,则server.1=127.0.0.1:2222:2001对应当前节点QuorumPeer的通讯地址配置。
// QuorumPeer.java
private long myid;
private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();
其中当QuorumPeer处于LEADING/FOLLOWING时,走127.0.0.1:2222,当QuorumPeer处于LOOKING时,走127.0.0.1:2001。
// server.1=127.0.0.1:2222:2001
public static final class AddressTuple {
// 127.0.0.1:2222 原子广播阶段 ip:port
public final InetSocketAddress quorumAddr;
// 127.0.0.1:2001 崩溃恢复阶段 ip:port
public final InetSocketAddress electionAddr;
public final InetSocketAddress clientAddr;
}
每个节点需要发现集群里的其他节点,zk是通过zoo.cfg配置得到的:
server.1=127.0.0.1:2222:2001
server.2=127.0.0.1:3333:3002
server.3=127.0.0.1:4444:4003
上述配置对应一个QuorumVerifier视图:
// QuorumPeer.java
private QuorumVerifier quorumVerifier;
实现是QuorumMaj(对于提案,投票节点过半通过),其中allMembers代表集群中所有节点,votingMembers代表集群中有投票权利的节点(除了Observer),observingMembers代表集群中Observer节点。
public class QuorumMaj implements QuorumVerifier {
private Map<Long, QuorumServer> allMembers = new HashMap<Long, QuorumServer>();
private HashMap<Long, QuorumServer> votingMembers = new HashMap<Long, QuorumServer>();
private HashMap<Long, QuorumServer> observingMembers = new HashMap<Long, QuorumServer>();
private long version = 0;
private int half;
}
每个QuorumServer存储对应节点的选举地址和广播地址,和AddressTuple类似。
// server.1=127.0.0.1:2222:2001
public static class QuorumServer {
// 127.0.0.1:2222
public InetSocketAddress addr = null;
// 127.0.0.1:2001
public InetSocketAddress electionAddr = null;
}
启动
QuorumPeer初始状态就是LOOKING,启动可以认为是一种特殊的崩溃恢复阶段。
QuorumPeer自身是一个Thread实现类,重写了start方法,在自身运行前做了一些准备工作。
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
@Override
public synchronized void start() {
// 从磁盘加载DataTree,设定epoch(currentEpoch、acceptedEpoch)
loadDataBase();
// 底层NIO通讯开启
startServerCnxnFactory();
// leader选举相关io线程
startLeaderElection();
// QuorumPeer线程start,leader选举主要逻辑
super.start();
}
}
加载数据到内存
从一个简单朴实的角度考虑,在众多leader的候选者中,应该选择数据最全的节点,即拥有zxid最大的节点。
所以为了正常执行选举,每个节点需要先从磁盘加载数据到内存即ZKDatabase#loadDataBase,得到lastProcessedZxid当前节点已经提交的最大事务id。
QuorumPeer#loadDataBase的主要作用除了拿到最大zxid以外,还要确定两个epoch:
1)currentEpoch:当前节点的epoch,持久化位置snapDir/currentEpoch;
2)acceptedEpoch:当前节点已经接受的epoch,一般就是leader的epoch,持久化位置snapDir/acceptedEpoch;
如果epoch没拿到,降级使用zxid的高32位。
private long acceptedEpoch = -1;
private long currentEpoch = -1;
private void loadDataBase() {
// 从磁盘加载DataTree
zkDb.loadDataBase();
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
// epochOfZxid = zxid >> 32
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
// 从snapDir/currentEpoch读
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// 降级使用最大事务id的高32位epoch
currentEpoch = epochOfZxid;
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
try {
// 从snapDir/acceptedEpoch读
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// 降级使用最大事务id的高32位epoch
acceptedEpoch = epochOfZxid;
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
// acceptedEpoch不能小于currentEpoch
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
}
}
启动Leader选举相关线程
QuorumPeer#startLeaderElection:首先初始化自己的Vote选票,认为自己是leader。
Election electionAlg;
/**
* This is who I think the leader currently is.
*/
volatile private Vote currentVote;
synchronized public void startLeaderElection() {
// 初始化自己的选票
// 选自己,zxid=自己最大的zxid,peerEpoch=currentEpoch
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
// electionType = 3
this.electionAlg = createElectionAlgorithm(electionType);
}
Vote是个不可变对象,包含下面的属性,初始选票id=myid,zxid=lastProcessedZxid,peerEpoch=currentEpoch。
// Vote.java
// 版本(为了做兼容)
final private int version;
// 节点id
final private long id;
// 最大事务id
final private long zxid;
// 选举轮次
final private long electionEpoch;
// 任期
final private long peerEpoch;
// 节点状态
final private ServerState state;
QuorumPeer#createElectionAlgorithm:选择不同的选举策略,默认使用FastLeaderElection,运行期间都使用同一个FastLeaderElection实例。
private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
switch (electionAlgorithm) {
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
// 理解为accept线程,接收对端连接
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
// 发送和接收的数据报都在FastLeaderElection的内存队列中
// 最后会交接给io线程或业务线程
fle.start();
le = fle;
break;
}
return le;
}
FastLeaderElection依赖于QuorumCnxManager做底层网络通讯,这底层io模型不看了,一个acceptor线程接对端连接(QuorumCnxManager.Listener),每个对端连接开启两个线程,一个负责发数据(QuorumCnxManager.SendWorker),一个负责接数据(QuorumCnxManager.RecvWorker)。
QuorumPeer主线程
加载内存数据和选举通讯线程启动后,QuorumPeer主线程开始运行。
主线程根据当前节点的不同状态,走不同的业务逻辑:
1)LOOKING:崩溃恢复阶段,FastLeaderElection#lookForLeader找Leader,没找到Leader前不会出这个方法;
2)FOLLOWING:原子广播阶段,已经找到Leader了,但是Leader不是自己,Follower#followLeader持续接收Leader请求;
3)LEADING:原子广播阶段,已经找到Leader了,且就是自己,Leader#lead持续与所有Follower通讯;
所以每个节点运行期间就在这几个状态中游走。
while (running) {
switch (getPeerState()) {
case LOOKING:
try {
// FastLeaderElection#lookForLeader
// 死循环,直到找到leader,更新当前选票
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
// 任何异常,还是LOOKING状态
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
// 构造并设置Folloer
setFollower(makeFollower(logFactory));
// 死循环,与leader通讯
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
// 关闭,更新当前节点状态
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
try {
// 构造并设置Leader
setLeader(makeLeader(logFactory));
// 死循环,与所有follower通讯
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
// 关闭,更新当前节点状态
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
}
选主
当QuorumPeer处于LOOKING状态时,进入FastLeaderElection#lookForLeader。
初始化
FastLeaderElection有四个属性需要初始化:
- logicalclock:选举时钟,初始为0,每次崩溃找leader时+1;
- proposedLeader:自己票选的leader;
- proposedZxid:自己票选的节点的zxid;
- proposedEpoch:自己票选的节点的任期,即QuorumPeer的currentEpoch;
AtomicLong logicalclock = new AtomicLong();
long proposedLeader;
long proposedZxid;
long proposedEpoch;
一开始,proposedLeader=当前节点id,proposedZxid=当前节点最大事务id(lastProcessedZxid),proposedEpoch=当前节点的epoch。
接着,FastLeaderElection#sendNotifications向所有参与选举的节点发送选票。
ToSend(Notification)包含:选举的leader/zxid/epoch,当前节点的选举时钟logicalclock,当前节点状态,整个集群拓扑关系QuorumVerifier。
选票同样会发给自己,但是QuorumCnxManager#toSend在处理过程中,区分了选票是否是给当前节点的,如果是自己则加入接收队列,不走网络通讯。
LOOKING
将自己的初始选票广播之后,FastLeaderElection#lookForLeader进入无限循环,直到找到leader。
广播选票
FastLeaderElection没收到足够对端选票Notification前(对端因为各种原因没能及时发出选票),都会向所有参与投票的节点广播自己的选票选票。
Notification的属性,和ToSend的属性基本一致,除了多了一个version用于做版本兼容。
static public class Notification {
/*
* Format version, introduced in 3.4.6
*/
public final static int CURRENTVERSION = 0x2;
int version;
// 票选leader的serverid
long leader;
// 票选leader的zxid
long zxid;
// 票选leader的epoch
long peerEpoch;
// 对端目前选举轮次
long electionEpoch;
// 对端状态
QuorumPeer.ServerState state;
// 对端serverid
long sid;
// 集群拓扑
QuorumVerifier qv;
}
比大小and更新选票
假设,目前集群没有一个节点选为主,对端过来的Notification都在LOOKING状态。
总方针是根据对端选举轮次electionEpoch和当前节点的选举轮次logicalclock来区分逻辑。
如果对端electionEpoch大于logicalclock,更新自己的logicalclock,totalOrderPredicate比较双方选票,如果返回true则更新自己的选票updateProposal,最后广播自己的选票sendNotifications;
如果对端electionEpoch小于logicalclock,忽视本票;
如果对端electionEpoch等于logicalclock,totalOrderPredicate比较双方选票,如果返回true则更新自己的选票,最后广播自己的选票sendNotifications;
totalOrderPredicate比大小:优先epoch,其次zxid,最次比serverid,都是越大越优先。
特殊的,对于小于当前节点选举轮次的情况,也不能完全不管,否则落后节点可能永远无法重新参与投票,最终无法选出leader。
当前节点有义务告诉他自己的选举轮次,让他进入本轮选举。
FastLeaderElection.Messenger.WorkerReceiver#run:
quorum
接下来FastLeadElection统计收到节点的选票,termPredicate判断是否满足quorum条件。
FastLeaderElection#termPredicate:和第一章写请求leader统计过半ack提交proposal是一样的。
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return voteSet.hasAllQuorums();
}
如果满足quorum过半条件,判断剩余选票recvqueue是否还能改变自己的选择,如果能的话,继续处理这个新的选票,放弃结束选举流程。
如果满足quorum条件,且recvqueue中没有比当前票选leader大的选票,
当前节点根据票选的serverid与自身serverid比较,状态变更为LEADING或FOLLOWING(忽略OBSERVING)。
找到leader之后
如果A节点找到leader,A可能处于FOLLOWING或LEADING。
A仍然可能收到处于LOOKING状态的节点B的选票,针对这种情况,A会将自己认为的leader告知B节点。
举个特殊的例子:一个三节点的集群,A和C两个节点已经组成一个稳定的集群。
B刚加入集群,B广播自己的初始选票,因为A和C都已经处于FOLLOWING或LEADING了,可以告诉B当前的leader,从而让B进入FOLLOWING状态。
FastLeaderElection.Messenger.WorkerReceiver#run:
那么B节点处于LOOKING状态下,会收到处于稳定状态的A选票,选票中的状态是LEADING或FOLLOWING。
FastLeaderElection#lookForLeader:B收到A选票为LEADING或FOLLOWING
一种情况是,A的选票和B的选举处于同一轮,只要recvset中收到的选票满足quorum条件,B就可以结束选举流程。checkLeader是对票选的leader做进一步校验,要确保收到过leader的选票,且leader处于LEADING状态。
这里为什么取outofelection集合做进一步校验,看不懂,不应该是recvset,用本轮的选票做checkLeader吗?
我找了一下变更记录,在新版本做了修复ZOOKEEPER-3537(9f2e2f23)。
另一种情况是,A的选票和B的选举并不处于同一轮,只要outofelection(选举之外)中收到的选票满足quorum条件,B也可以结束选举流程,并将选举轮次和A保持一致。
恢复
恢复阶段,我们按照一个好理解的时序来梳理。
创建Leader
回到QuorumPeer主线程,此时假如当前节点serverid等于最终选票的serverid。
那么这个节点变为LEADING状态,创建Leader对象。
创建Leader会创建ZooKeeperServer的LeaderZooKeeperServer实现。
LeaderZooKeeperServer负责和客户端进行通讯。
这也就是为什么当QuorumPeer处于崩溃阶段无法处理客户端请求,follower也是同理。
因为leader出了lead方法,finally会关闭ZooKeeperServer,也就无法处理客户端请求了。
创建Follower
当前节点serverid不等于最终选票的serverid。
那么这个节点变为FOLLOWING状态,创建Follower对象。
QuorumPeer#makeFollower:创建FollowerZooKeeperServer负责和客户端通讯。
leader与过半节点建立连接
在恢复前,要先确定本轮epoch,一般来说新epoch=epoch+1。
Leader#lead的第一阶段,总共分为几个步骤:
1)zk.loadData:加载磁盘数据到内存;
2)LearnerCnxAcceptor:开启quorum端口,接收follower连接,针对每个follower连接会开启一个LearnerHandler线程;
3)getEpochToPropose:等待最新epoch;
4)初始化zxid:zxid=epoch<<32 | 0;
Follower发送FOLLOWERINFO
Follower#followLeader
1)Follower从QuorumServer中找到CurrentVote对应节点,即为leader。
2)connectToLeader:与leader建立连接,leader会创建LearnerHandler。
3)registerWithLeader:向leader注册。
Learner#registerWithLeader:发送FOLLOWERINFO,告知Leader自己的sid和自己的acceptEpoch<<32|0组成的zxid。
Leader接收FOLLOWERINFO
LearnerHandler#run:Follower与Leader建立连接后,Leader创建LearnerHandler负责与follower通讯。
首先Leader阻塞等待Follower的FOLLOWERINFO数据包,其中包含sid和acceptEpoch,最终也是调用Leader#getEpochToPropose。
public void run() {
// ...
// step1 等待follower发来FOLLOWERINFO
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
// FOLLOWERINFO中的sid
byte learnerInfoData[] = qp.getData();
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
// step2 等待过半server连接到leader,并确定最新的epoch
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
}
过半FOLLOWERINFO
Leader#getEpochToPropose
这个方法阻塞等待至过半节点连接至leader,并决定最新的epoch
1)统计每个serverid目前的acceptEpoch:一般来说epoch=leader的acceptEpoch+1,如果有follower的acceptEpoch比当前leader的epoch大,则取follower的acceptEpoch+1作为新epoch;
2)等待过半server与当前leader建立连接:leader在Leader#lead会把自己的sid传入;其他follower会在Follower#followLeader与Leader建立连接,leader在LearnerHandler中会将follower的sid和epoch传入;
long epoch = -1;
// 是否在等待新的epoch
boolean waitingForNewEpoch = true;
// 与leader建立连接的follower
protected final Set<Long> connectingFollowers = new HashSet<Long>();
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized(connectingFollowers) {
if (!waitingForNewEpoch) {
return epoch;
}
if (lastAcceptedEpoch >= epoch) { // epoch++
epoch = lastAcceptedEpoch+1;
}
if (isParticipant(sid)) { // 参与投票
connectingFollowers.add(sid);
}
QuorumVerifier verifier = self.getQuorumVerifier();
if (connectingFollowers.contains(self.getId()) &&
verifier.containsQuorum(connectingFollowers)) {
// case1 已经有过半server与当前leader建立连接
waitingForNewEpoch = false;
self.setAcceptedEpoch(epoch);
connectingFollowers.notifyAll();
} else {
// case2 还没有过半server与当前leader建立连接,wait直至waitingForNewEpoch=false(case1)
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(waitingForNewEpoch && cur < end) {
// wait一会,然后再来检查waitingForNewEpoch
connectingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
if (waitingForNewEpoch) {
// 超时没有过半server建立连接,结束leading状态
throw new InterruptedException("Timeout while waiting for epoch from quorum");
}
}
return epoch;
}
}
最新epoch达成一致
经过上述步骤,过半server已经连接至leader。
目前leader的acceptEpoch是所有节点中最大的,但原来leader的currentEpoch和zxid并不一定是最大的。
也可能是其他follower有更大currentEpoch或zxid,选举只需要过半票即可,未必能保证leader拥有最大currentEpoch。
Leader#lead进入第二阶段,这个阶段需要等待过半server同意leader提出的新epoch。
Leader发送LEADERINFO
LearnerHandler#run:
1)针对每个follower,leader发送LEADERINFO,包含自己最新的zxid,即epoch<<32 | 0;
2)阻塞等待Follower回复ACKEPOCH;
3)Leader#waitForEpochAck:等待收到过半epochack;
Follower回复ACKEPOCH
Learner#registerWithLeader:follower读到LEADERINFO,比较自己的acceptEpoch和leader最新提出的epoch:
1)如果leader大于自己,接受新的epoch,更新acceptEpoch,回复ACKEPOCH;
2)如果leader等于自己,回复ACKEPOCH;
3)如果leader小于自己,抛出异常,结束FOLLOWING状态,进入LOOKING;
ACKEPOCH包中,包含follower目前最大的zxid。
过半ACKEPOCH
Leader#waitForEpochAck:等待方式和Leader#getEpochToPropose一致,区别在于这个方法的目的是过半节点同意leader提出的新epoch,如果存在一个follower的epoch/zxid(StateSummary)比自己先前的大,则结束LEADING,进入LOOKING。
要注意这里的leaderStateSummary,对于leader来说,是提出新epoch之前的最大zxid和acceptEpoch。
也就是说,前面Leader#lead一阶段发起新epoch,只是leader接收了所有follower的epoch,并取最大值作为自己的acceptEpoch和zxid,并不代表follower能接受leader的epoch。
在过半ACKEPOCH阶段,leader会意识到自己的原来的epoch或zxid并非集群中的最大值(ss.isMoreRecentThan),只是巧合自己成为leader,那么leader会放弃自己的LEADING状态,抛出异常。
如果遇到这种情况,虽然leader的acceptEpoch增加了,但是实际上currentEpoch还处于原来的大小,没有被实际应用。
数据同步
Leader#lead至此阻塞等待收到过半Leader.ACK,在此之前,leader和follower需要完成数据同步。
Leader同步数据
回到LearnerHandler#run:当epoch达成一致后,leader主动向follower同步数据。
LearnerHandler#syncFollower:针对每个follower当前的zxid,由leader主动发送DIFF/TRUNC/SNAP包给follower。
对于Leader,先要确定自己的commitLog水位线。
ZKDatabase#loadDataBase:在加载磁盘数据到内存DataTree的阶段,CommitLog就被加载到内存了。
对应实现如下,所有磁盘上的事务数据,都被封装为Proposal提案,放入了commitedLog,用于后续同步给follower,避免了同步要走磁盘io。
其中minCommitedLog是committedLog中最小的zxid,maxCommittedLog是最大的,默认情况commitLogCount=500,内存中只有最新的500条事务Proposal。
LearnerHandler#syncFollower:
isPeerNewEpochZxid:
判断follower的低32位是否是0,如果是的话,代表follower只是处于新的一个epoch,还没有发生任何事务;
maxCommittedLog/minCommittedLog:
内存中committedLog的范围,由此可以选择向follower的同步策略,走内存,还是走磁盘;
txnLogSyncEnabled:是否允许走事务日志同步,默认true;
needSnap:决策当前follower是否要走snapshot同步;
case1-已经同步
如果follower的zxid和leader的zxid已经一致,发送一个DIFF包,zxid是当前zxid。
case2-follower的zxid超出maxCommittedLog
如果follower的zxid超出commitLog,leader发送TRUNC包,zxid是最大committedLog。
case3-follower的zxid在commitLog范围内
LearnerHandler#queueCommittedProposals:比较follower和leader的zxid情况,从itr迭代器中获取Proposal发送给follower,首先会发送一个OpPacket,即DIFF/TRUNC,接着是一连串的Proposal+COMMIT。
protected long queueCommittedProposals(Iterator<Proposal> itr,
long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
long queuedZxid = peerLastZxid;
long prevProposalZxid = -1;
while (itr.hasNext()) {
Proposal propose = itr.next();
long packetZxid = propose.packet.getZxid();
if ((maxZxid != null) && (packetZxid > maxZxid)) {
break;
}
// skip the proposals the peer already has
if (packetZxid < peerLastZxid) {
prevProposalZxid = packetZxid;
continue;
}
if (needOpPacket) {
if (packetZxid == peerLastZxid) {
needOpPacket = false;
continue;
}
if (isPeerNewEpochZxid) {
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
} else if (packetZxid > peerLastZxid ) {
if (ZxidUtils.getEpochFromZxid(packetZxid) !=
ZxidUtils.getEpochFromZxid(peerLastZxid)) {
return queuedZxid;
}
queueOpPacket(Leader.TRUNC, prevProposalZxid);
needOpPacket = false;
}
}
if (packetZxid <= queuedZxid) {
continue;
}
queuePacket(propose.packet); // Proposal
queueOpPacket(Leader.COMMIT, packetZxid); // COMMIT
queuedZxid = packetZxid;
}
if (needOpPacket && isPeerNewEpochZxid) {
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
}
return queuedZxid;
}
case4-follower的zxid小于minCommittedLog
leader分成两段同步:
1)磁盘事务日志找到zxid对应事务,从那之后同步到minCommittedLog;
2)内存commitLog,同步minCommittedLog到maxCommittedLog;
如果磁盘中已经找不到zxid对应事务(txnLogItr.hasNext=false),说明follower的zxid在事务日志里已经找不到了,只能走snapshot同步,即needSnap=true
case5-snapshot同步
比如follower的zxid小于minCommittedLog,且磁盘上也找不到zxid对应的事务日志,则必须走snapshot同步。
LearnerHandler#run:
ZKDatabase#serializeSnapshot:将底层内存中的DataTree序列化后写到输出流。
Follower接收数据
DIFF/SNAP/TRUNC
follower首先会收到一个DIFF或SNAP或TRUNC包,代表leader采用什么样的方式,执行后续同步。
对于DIFF包,说明follower落后于leader,follower不需要创建snapshot,只需要在原有snapshot之上继续写;
对于SNAP包,follower直接读leader的snapshot,恢复内存DataTree;
对于TRUNC包,follower删除超出leader的zxid的事务日志;
Learner#syncWithLeader:
Proposal+Commit
对于DIFF包情况,follower落后于leader,需要将所有leader的Proposal+Commit包跑一遍和写请求一样的流程,写事务日志+写内存。
对于TRUNC和SNAP情况,follower只需要记录已经提交的zxid。
Learner#syncWithLeader:
创建Snapshot
Leader发送一个NEWLEADER给follower。
LearnerHandler#run:
Follower收到NEWLEADER,对于TRUNC和SNAP的情况,将内存DataTree创建snapshot,正式接受最新的epoch,并回复Leader一个Leader.ACK。
过半Leader.ACK
对于leader。
LearnerHandler#run:当waitForNewLeaderAck收到过半Leader.ack,向follower发送Leader.UPTODATE,并启动底层LeaderZooKeeperServer,开始接受客户端请求。
对于follower。
Learner#syncWithLeader退出同步逻辑(break outerloop)。
启动底层FollowerZooKeeperServer,允许处理客户端请求,并处理snapshot到uptodate间的数据。
心跳
当崩溃恢复阶段过去后,Leader和Follower都长时间处于稳定状态。
Leader
Leader#lead:每隔一段时间,向所有follower发送ping。
// ...恢复阶段
// 心跳
boolean tickSkip = true;
while (true) {
synchronized (this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!tickSkip) {
self.tick.incrementAndGet();
}
// 校验过半follower的仍然活着tickOfNextAckDeadline
SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
syncedAckSet.addAck(self.getId());
for (LearnerHandler f : getLearners()) {
if (f.synced()) { // 校验逻辑
syncedAckSet.addAck(f.getSid());
}
}
if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
// 如果没有收到过半follower的存活通知
shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
+ syncedAckSet.ackSetsToString() + " ]";
break;
}
tickSkip = !tickSkip;
}
for (LearnerHandler f : getLearners()) {
f.ping();
}
}
if (shutdownMessage != null) {
shutdown(shutdownMessage);
}
在发送ping之前,leader会校验与过半follower是否还能正常通讯。
LearnerHandler#synced:当前线程存活 且 tick<=tickOfNextAckDeadline。
tick在发送心跳前(Leader#lead)会增加。
而tickOfNextAckDeadline在收到follower的任意数据包后更新。
所以只要follower能正常处理leader的ping,或者其他proposal或者commit,就能在synced检查时返回true。
LearnerHandler#run:
LearnerHandler#ping:先检查follower是否在指定时间内返回过对于一个Proposal的ACK。
如果没有,则关闭LearnerHandler;如果有,继续发送ping给follower。
Follower
Follower#followLeader:处理Leader发来的数据包,其中包含ping、proposal、commit等等。
Learner#ping:Follower收到Leader的ping之后,会回复一个ping,把自己的session相关数据同步给leader。
那么Follower如何判断到Leader下线呢?
这个主要是通过通讯层的异常触发。
比如leader连接断开:
比如我在leader发送ping前打个断点,follower读socket超时:
这个超时在哪设置的呢?
Learner#connectToLeader->Learner#createSocket:在follower连接leader的quorum端口时设置的。
其实leader也会对follower的socket做同样的超时处理:
总结
选主
当集群中节点都处于LOOKING状态,选主流程大致如下:
1)广播自己的选票
一开始会选自己,如果没选出主且没有收到其他人的选票,会偶尔广播一下。
2)接收对端的选票(包含自己)
3)比选举轮次
- 比自己大,更新自己的选举轮次=对方选举轮次
- 和自己一样大,ok
- 比自己小,忽略这张选票,回复对端自己的选票,让对方进入最新的选举轮次
4)比zxid和epoch,更新自己的选票,并广播选票
目的是选数据最全的节点做leader
优先选epoch最大的,其次选zxid最大的,再次选sid(myid/serverid)最大的
5)判断是否满足quorum
默认quorum条件就是过半,如果一个节点的选票超过一半,则结束looking
6)leader or follower
如果最终获得过半选票的节点的sid是自己,则自己成为leader,否则自己成为follower
恢复
选主完毕,节点都处于FOLLOWING或LEADING状态。
此时每个节点的数据可能都不相同。
确定最新epoch和zxid
在进行数据同步之前,必须要确定最新的epoch和zxid。
Step1:过半Follower与Leader建立连接
1)Leader:加载磁盘数据到内存
2)Leader:创建LearnerCnxAcceptor接收follower连接
3)Follower:与Leader建立连接,发送FOLLOWERINFO,包含自己的sid和acceptEpoch
4)Leader:对于每个Follower创建一个LearnerHandler负责与之通讯
5)Leader:阻塞等待过半FOLLOWERINFO,或超时进入LOOKING
6)Leader:用收到的最大的acceptEpoch+1作为最新的epoch,并初始化zxid=新epoch<<32 | 0
Step2:过半节点对于最新epoch达成一致
1)Leader:发送LEADERINFO给所有Follower,包含自己最新的zxid,即epoch<<32 | 0;
2)Follower:接收LEADERINFO,检查最新的zxid是否epoch大于等于自己的acceptEpoch;
如果校验通过,回复Leader一个ACKEPOCH(包含自己的最大zxid),并设置acceptEpoch=最新epoch;
如果校验不通过,不会回复ACKEPOCH,抛出异常结束FOLLOWING状态,进入LOOKING;
3)Leader:收到ACKEPOCH,判断自己先前的StateSummary(epoch/zxid),是否大于Follower的epoch和zxid,如果校验不通过,抛出异常,结束LEADING,进入LOOKING
4)Leader:阻塞等待过半ACKEPOCH,或超时进入LOOKING
数据同步
从leader的视角来看,只要过半节点与自己数据一致,就可以开启底层ZooKeeperServer,允许与客户端通讯。
Leader#lead:
Leader针对每个Follower的LearnerHandler处理时序如下:
1)拿到ACKEPOCH里返回的follower的zxid
2)比较zxid,如果zxid与自己一致,代表数据已经同步,走4)
3)zxid与自己不一致,比较zxid是否在commitLog里,采用不同的策略同步数据:
a)zxid大于maxCommitLog
发送TRUNC,期望follower截断数据
b)zxid在commitLog里
follower落后不太多,用内存CommitLog同步数据
c)zxid小于minCommitLog,磁盘事务日志中存在zxid对应事务
用磁盘事务日志+内存CommitLog同步数据
d)zxid小于minCommitLog,磁盘事务日志中找不到zxid对应数据
必须走SNAP快照同步
4)已经将该同步的数据都写给follower了,再发送个NEWLEADER给follower,里面的zxid是新epoch<<32|0
5)阻塞等待过半节点返回ACK
如果过半节点返回ACK,代表过半节点已经接受了最新epoch,且数据已经与leader一致,可以对外提供服务。
1)Leader收到过半ACK后,从阻塞中恢复
2)Leader发送给UPTODATE给Follower,启动ZooKeeperServer可以对外提供服务
3)Follower收到UPTODATE,启动ZooKeeperServer可以对外提供服务
心跳
崩溃恢复完成后,集群处于一个稳定状态。
Leader会定时主动向所有Follower发送PING,Follower会回复Leader一个PING。
Leader如何检测Follower下线?
1)通过网络通讯异常
2)长时间未收到Follower的任何数据包
Follower如何检测Leader下线?
通过网络通讯异常
转载自:https://juejin.cn/post/7203138683226505272