likes
comments
collection
share

ZooKeeper源码(二)崩溃恢复

作者站长头像
站长
· 阅读数 49

前言

本文基于zk3.5.8源码,分析zk的崩溃恢复流程。

主要包括选主、恢复、心跳三个部分。

没耐心的朋友可以跳过分析部分,直接看总结。

QuorumPeer

ZK中的每个节点,都会启动一个QuorumPeer,QuorumPeer线程根据当前节点状态,决定以何种角色启动。

当节点处于崩溃恢复阶段(比如刚启动),节点都是LOOKING状态,需要找到Leader;当找到Leader后,进入原子广播阶段,变为LEADING或FOLLOWING状态。(忽略OBSERVING)

// QuorumPeer.java
private ServerState state = ServerState.LOOKING;
public enum ServerState {
    LOOKING, FOLLOWING, LEADING, OBSERVING;
}

当处于原子广播阶段,QuorumPeer根据状态会new出来Leader/Follower/Observer实例;当崩溃后,这些实例又会被销毁(shutdown并指向null),重新进入LOOKING状态。

// QuorumPeer.java
public Follower follower;
public Leader leader;
public Observer observer;

集群中每个节点都对应一个id,即myid,这个id来源于dataDir/myid文件。

如果当前节点myid=1,则server.1=127.0.0.1:2222:2001对应当前节点QuorumPeer的通讯地址配置。

// QuorumPeer.java
private long myid;
private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();

其中当QuorumPeer处于LEADING/FOLLOWING时,走127.0.0.1:2222,当QuorumPeer处于LOOKING时,走127.0.0.1:2001。

// server.1=127.0.0.1:2222:2001
public static final class AddressTuple {
    // 127.0.0.1:2222 原子广播阶段 ip:port
    public final InetSocketAddress quorumAddr;
    // 127.0.0.1:2001 崩溃恢复阶段 ip:port
    public final InetSocketAddress electionAddr;
    public final InetSocketAddress clientAddr;
}

每个节点需要发现集群里的其他节点,zk是通过zoo.cfg配置得到的:

server.1=127.0.0.1:2222:2001
server.2=127.0.0.1:3333:3002
server.3=127.0.0.1:4444:4003

上述配置对应一个QuorumVerifier视图:

// QuorumPeer.java
private QuorumVerifier quorumVerifier;

实现是QuorumMaj(对于提案,投票节点过半通过),其中allMembers代表集群中所有节点,votingMembers代表集群中有投票权利的节点(除了Observer),observingMembers代表集群中Observer节点。

public class QuorumMaj implements QuorumVerifier {
    private Map<Long, QuorumServer> allMembers = new HashMap<Long, QuorumServer>();
    private HashMap<Long, QuorumServer> votingMembers = new HashMap<Long, QuorumServer>();
    private HashMap<Long, QuorumServer> observingMembers = new HashMap<Long, QuorumServer>();
    private long version = 0;
    private int half;
}

每个QuorumServer存储对应节点的选举地址和广播地址,和AddressTuple类似。

// server.1=127.0.0.1:2222:2001
public static class QuorumServer {
    // 127.0.0.1:2222
    public InetSocketAddress addr = null;
    // 127.0.0.1:2001
    public InetSocketAddress electionAddr = null;
}

启动

QuorumPeer初始状态就是LOOKING,启动可以认为是一种特殊的崩溃恢复阶段。

QuorumPeer自身是一个Thread实现类,重写了start方法,在自身运行前做了一些准备工作。

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
    @Override
    public synchronized void start() {
        // 从磁盘加载DataTree,设定epoch(currentEpoch、acceptedEpoch)
        loadDataBase();
        // 底层NIO通讯开启
        startServerCnxnFactory();
        // leader选举相关io线程
        startLeaderElection();
        // QuorumPeer线程start,leader选举主要逻辑
        super.start();
    }
}

加载数据到内存

从一个简单朴实的角度考虑,在众多leader的候选者中,应该选择数据最全的节点,即拥有zxid最大的节点。

所以为了正常执行选举,每个节点需要先从磁盘加载数据到内存即ZKDatabase#loadDataBase,得到lastProcessedZxid当前节点已经提交的最大事务id。

QuorumPeer#loadDataBase的主要作用除了拿到最大zxid以外,还要确定两个epoch:

1)currentEpoch:当前节点的epoch,持久化位置snapDir/currentEpoch;

2)acceptedEpoch:当前节点已经接受的epoch,一般就是leader的epoch,持久化位置snapDir/acceptedEpoch;

如果epoch没拿到,降级使用zxid的高32位。

private long acceptedEpoch = -1;
private long currentEpoch = -1;
private void loadDataBase() {
    // 从磁盘加载DataTree
    zkDb.loadDataBase();
    long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
    // epochOfZxid = zxid >> 32
    long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
    try {
        // 从snapDir/currentEpoch读
        currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
    } catch(FileNotFoundException e) {
        // 降级使用最大事务id的高32位epoch
        currentEpoch = epochOfZxid;
        writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
    }
    if (epochOfZxid > currentEpoch) {
        throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
    }
    try {
        // 从snapDir/acceptedEpoch读
        acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
    } catch(FileNotFoundException e) {
        // 降级使用最大事务id的高32位epoch
        acceptedEpoch = epochOfZxid;
        writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
    }
    // acceptedEpoch不能小于currentEpoch
    if (acceptedEpoch < currentEpoch) {
        throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
    }
}

启动Leader选举相关线程

QuorumPeer#startLeaderElection:首先初始化自己的Vote选票,认为自己是leader。

Election electionAlg;
/**
 * This is who I think the leader currently is.
 */
volatile private Vote currentVote;
synchronized public void startLeaderElection() {
    // 初始化自己的选票
    // 选自己,zxid=自己最大的zxid,peerEpoch=currentEpoch
    if (getPeerState() == ServerState.LOOKING) {
       currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    }
    // electionType = 3
    this.electionAlg = createElectionAlgorithm(electionType);
}

Vote是个不可变对象,包含下面的属性,初始选票id=myid,zxid=lastProcessedZxid,peerEpoch=currentEpoch。

// Vote.java
// 版本(为了做兼容)
final private int version;
// 节点id
final private long id;
// 最大事务id
final private long zxid;
// 选举轮次
final private long electionEpoch;
// 任期
final private long peerEpoch;
// 节点状态
final private ServerState state;

QuorumPeer#createElectionAlgorithm:选择不同的选举策略,默认使用FastLeaderElection,运行期间都使用同一个FastLeaderElection实例。

private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
    switch (electionAlgorithm) {
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
            oldQcm.halt();
        }
        QuorumCnxManager.Listener listener = qcm.listener;
        // 理解为accept线程,接收对端连接
        listener.start();
        FastLeaderElection fle = new FastLeaderElection(this, qcm);
        // 发送和接收的数据报都在FastLeaderElection的内存队列中
        // 最后会交接给io线程或业务线程
        fle.start();
        le = fle;
        break;
    }
    return le;
}

FastLeaderElection依赖于QuorumCnxManager做底层网络通讯,这底层io模型不看了,一个acceptor线程接对端连接(QuorumCnxManager.Listener),每个对端连接开启两个线程,一个负责发数据(QuorumCnxManager.SendWorker),一个负责接数据(QuorumCnxManager.RecvWorker)。

QuorumPeer主线程

加载内存数据和选举通讯线程启动后,QuorumPeer主线程开始运行。

主线程根据当前节点的不同状态,走不同的业务逻辑:

1)LOOKING:崩溃恢复阶段,FastLeaderElection#lookForLeader找Leader,没找到Leader前不会出这个方法;

2)FOLLOWING:原子广播阶段,已经找到Leader了,但是Leader不是自己,Follower#followLeader持续接收Leader请求;

3)LEADING:原子广播阶段,已经找到Leader了,且就是自己,Leader#lead持续与所有Follower通讯;

所以每个节点运行期间就在这几个状态中游走。

while (running) {
    switch (getPeerState()) {
    case LOOKING:
        try {
            // FastLeaderElection#lookForLeader
            // 死循环,直到找到leader,更新当前选票
            setCurrentVote(makeLEStrategy().lookForLeader());
        } catch (Exception e) {
            LOG.warn("Unexpected exception", e);
            // 任何异常,还是LOOKING状态
            setPeerState(ServerState.LOOKING);
        }                        
        break;
    case FOLLOWING:
        try {
            // 构造并设置Folloer
            setFollower(makeFollower(logFactory));
            // 死循环,与leader通讯
            follower.followLeader();
        } catch (Exception e) {
           LOG.warn("Unexpected exception",e);
        } finally {
           // 关闭,更新当前节点状态
           follower.shutdown();
           setFollower(null);
           updateServerState();
        }
        break;
    case LEADING:
        try {
            // 构造并设置Leader
            setLeader(makeLeader(logFactory));
            // 死循环,与所有follower通讯
            leader.lead();
            setLeader(null);
        } catch (Exception e) {
            LOG.warn("Unexpected exception",e);
        } finally {
            // 关闭,更新当前节点状态
            if (leader != null) {
                leader.shutdown("Forcing shutdown");
                setLeader(null);
            }
            updateServerState();
        }
        break;
    }
}

选主

当QuorumPeer处于LOOKING状态时,进入FastLeaderElection#lookForLeader

初始化

FastLeaderElection有四个属性需要初始化:

  • logicalclock:选举时钟,初始为0,每次崩溃找leader时+1;
  • proposedLeader:自己票选的leader;
  • proposedZxid:自己票选的节点的zxid;
  • proposedEpoch:自己票选的节点的任期,即QuorumPeer的currentEpoch;
AtomicLong logicalclock = new AtomicLong();
long proposedLeader;
long proposedZxid;
long proposedEpoch;

一开始,proposedLeader=当前节点id,proposedZxid=当前节点最大事务id(lastProcessedZxid),proposedEpoch=当前节点的epoch。

ZooKeeper源码(二)崩溃恢复

ZooKeeper源码(二)崩溃恢复

接着,FastLeaderElection#sendNotifications向所有参与选举的节点发送选票。

ToSend(Notification)包含:选举的leader/zxid/epoch,当前节点的选举时钟logicalclock,当前节点状态,整个集群拓扑关系QuorumVerifier。

ZooKeeper源码(二)崩溃恢复

选票同样会发给自己,但是QuorumCnxManager#toSend在处理过程中,区分了选票是否是给当前节点的,如果是自己则加入接收队列,不走网络通讯。

ZooKeeper源码(二)崩溃恢复

LOOKING

将自己的初始选票广播之后,FastLeaderElection#lookForLeader进入无限循环,直到找到leader。

ZooKeeper源码(二)崩溃恢复

广播选票

FastLeaderElection没收到足够对端选票Notification前(对端因为各种原因没能及时发出选票),都会向所有参与投票的节点广播自己的选票选票。

ZooKeeper源码(二)崩溃恢复

Notification的属性,和ToSend的属性基本一致,除了多了一个version用于做版本兼容。

static public class Notification {
    /*
     * Format version, introduced in 3.4.6
     */
    public final static int CURRENTVERSION = 0x2;
    int version;
    // 票选leader的serverid
    long leader;
    // 票选leader的zxid
    long zxid;
    // 票选leader的epoch
    long peerEpoch;
    // 对端目前选举轮次
    long electionEpoch;
    // 对端状态
    QuorumPeer.ServerState state;
    // 对端serverid
    long sid;
    // 集群拓扑
    QuorumVerifier qv;
}

比大小and更新选票

假设,目前集群没有一个节点选为主,对端过来的Notification都在LOOKING状态。

总方针是根据对端选举轮次electionEpoch和当前节点的选举轮次logicalclock来区分逻辑。

如果对端electionEpoch大于logicalclock,更新自己的logicalclock,totalOrderPredicate比较双方选票,如果返回true则更新自己的选票updateProposal,最后广播自己的选票sendNotifications;

如果对端electionEpoch小于logicalclock,忽视本票;

如果对端electionEpoch等于logicalclock,totalOrderPredicate比较双方选票,如果返回true则更新自己的选票,最后广播自己的选票sendNotifications;

ZooKeeper源码(二)崩溃恢复

totalOrderPredicate比大小:优先epoch,其次zxid,最次比serverid,都是越大越优先。

ZooKeeper源码(二)崩溃恢复

特殊的,对于小于当前节点选举轮次的情况,也不能完全不管,否则落后节点可能永远无法重新参与投票,最终无法选出leader。

当前节点有义务告诉他自己的选举轮次,让他进入本轮选举。

FastLeaderElection.Messenger.WorkerReceiver#run:

ZooKeeper源码(二)崩溃恢复

quorum

接下来FastLeadElection统计收到节点的选票,termPredicate判断是否满足quorum条件。

ZooKeeper源码(二)崩溃恢复

FastLeaderElection#termPredicate:和第一章写请求leader统计过半ack提交proposal是一样的。

protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())) {
            voteSet.addAck(entry.getKey());
        }
    }
    return voteSet.hasAllQuorums();
}

如果满足quorum过半条件,判断剩余选票recvqueue是否还能改变自己的选择,如果能的话,继续处理这个新的选票,放弃结束选举流程。

ZooKeeper源码(二)崩溃恢复

如果满足quorum条件,且recvqueue中没有比当前票选leader大的选票,

当前节点根据票选的serverid与自身serverid比较,状态变更为LEADING或FOLLOWING(忽略OBSERVING)。

ZooKeeper源码(二)崩溃恢复

找到leader之后

如果A节点找到leader,A可能处于FOLLOWING或LEADING。

A仍然可能收到处于LOOKING状态的节点B的选票,针对这种情况,A会将自己认为的leader告知B节点。

举个特殊的例子:一个三节点的集群,A和C两个节点已经组成一个稳定的集群。

B刚加入集群,B广播自己的初始选票,因为A和C都已经处于FOLLOWING或LEADING了,可以告诉B当前的leader,从而让B进入FOLLOWING状态。

FastLeaderElection.Messenger.WorkerReceiver#run:

ZooKeeper源码(二)崩溃恢复

那么B节点处于LOOKING状态下,会收到处于稳定状态的A选票,选票中的状态是LEADING或FOLLOWING。

FastLeaderElection#lookForLeader:B收到A选票为LEADING或FOLLOWING

一种情况是,A的选票和B的选举处于同一轮,只要recvset中收到的选票满足quorum条件,B就可以结束选举流程。checkLeader是对票选的leader做进一步校验,要确保收到过leader的选票,且leader处于LEADING状态。

ZooKeeper源码(二)崩溃恢复

这里为什么取outofelection集合做进一步校验,看不懂,不应该是recvset,用本轮的选票做checkLeader吗?

我找了一下变更记录,在新版本做了修复ZOOKEEPER-3537(9f2e2f23)。

ZooKeeper源码(二)崩溃恢复

另一种情况是,A的选票和B的选举并不处于同一轮,只要outofelection(选举之外)中收到的选票满足quorum条件,B也可以结束选举流程,并将选举轮次和A保持一致。

ZooKeeper源码(二)崩溃恢复

恢复

恢复阶段,我们按照一个好理解的时序来梳理。

创建Leader

回到QuorumPeer主线程,此时假如当前节点serverid等于最终选票的serverid。

那么这个节点变为LEADING状态,创建Leader对象。

ZooKeeper源码(二)崩溃恢复

创建Leader会创建ZooKeeperServer的LeaderZooKeeperServer实现。

LeaderZooKeeperServer负责和客户端进行通讯。

ZooKeeper源码(二)崩溃恢复

这也就是为什么当QuorumPeer处于崩溃阶段无法处理客户端请求,follower也是同理。

因为leader出了lead方法,finally会关闭ZooKeeperServer,也就无法处理客户端请求了。

创建Follower

当前节点serverid不等于最终选票的serverid。

那么这个节点变为FOLLOWING状态,创建Follower对象。

ZooKeeper源码(二)崩溃恢复

QuorumPeer#makeFollower:创建FollowerZooKeeperServer负责和客户端通讯。

ZooKeeper源码(二)崩溃恢复

leader与过半节点建立连接

在恢复前,要先确定本轮epoch,一般来说新epoch=epoch+1。

Leader#lead的第一阶段,总共分为几个步骤:

1)zk.loadData:加载磁盘数据到内存;

2)LearnerCnxAcceptor:开启quorum端口,接收follower连接,针对每个follower连接会开启一个LearnerHandler线程;

3)getEpochToPropose:等待最新epoch;

4)初始化zxid:zxid=epoch<<32 | 0;

ZooKeeper源码(二)崩溃恢复

Follower发送FOLLOWERINFO

Follower#followLeader

1)Follower从QuorumServer中找到CurrentVote对应节点,即为leader。

2)connectToLeader:与leader建立连接,leader会创建LearnerHandler。

3)registerWithLeader:向leader注册。

ZooKeeper源码(二)崩溃恢复

Learner#registerWithLeader:发送FOLLOWERINFO,告知Leader自己的sid和自己的acceptEpoch<<32|0组成的zxid。

ZooKeeper源码(二)崩溃恢复

Leader接收FOLLOWERINFO

LearnerHandler#run:Follower与Leader建立连接后,Leader创建LearnerHandler负责与follower通讯。

首先Leader阻塞等待Follower的FOLLOWERINFO数据包,其中包含sid和acceptEpoch,最终也是调用Leader#getEpochToPropose。

 public void run() {
    // ...
    // step1 等待follower发来FOLLOWERINFO
    QuorumPacket qp = new QuorumPacket();
    ia.readRecord(qp, "packet");
    // FOLLOWERINFO中的sid
    byte learnerInfoData[] = qp.getData();
    ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
    if (learnerInfoData.length >= 8) {
        this.sid = bbsid.getLong();
    }
    // step2 等待过半server连接到leader,并确定最新的epoch
    long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
 }

过半FOLLOWERINFO

Leader#getEpochToPropose

这个方法阻塞等待至过半节点连接至leader,并决定最新的epoch

1)统计每个serverid目前的acceptEpoch:一般来说epoch=leader的acceptEpoch+1,如果有follower的acceptEpoch比当前leader的epoch大,则取follower的acceptEpoch+1作为新epoch;

2)等待过半server与当前leader建立连接:leader在Leader#lead会把自己的sid传入;其他follower会在Follower#followLeader与Leader建立连接,leader在LearnerHandler中会将follower的sid和epoch传入;

long epoch = -1;
// 是否在等待新的epoch
boolean waitingForNewEpoch = true;
// 与leader建立连接的follower
protected final Set<Long> connectingFollowers = new HashSet<Long>();
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
    synchronized(connectingFollowers) {
        if (!waitingForNewEpoch) {
            return epoch;
        }
        if (lastAcceptedEpoch >= epoch) { // epoch++
            epoch = lastAcceptedEpoch+1;
        }
        if (isParticipant(sid)) { // 参与投票
            connectingFollowers.add(sid);
        }
        QuorumVerifier verifier = self.getQuorumVerifier();
        if (connectingFollowers.contains(self.getId()) &&
                                        verifier.containsQuorum(connectingFollowers)) {
            // case1 已经有过半server与当前leader建立连接
            waitingForNewEpoch = false;
            self.setAcceptedEpoch(epoch);
            connectingFollowers.notifyAll();
        } else {
            // case2 还没有过半server与当前leader建立连接,wait直至waitingForNewEpoch=false(case1)
            long start = Time.currentElapsedTime();
            long cur = start;
            long end = start + self.getInitLimit()*self.getTickTime();
            while(waitingForNewEpoch && cur < end) {
                // wait一会,然后再来检查waitingForNewEpoch
                connectingFollowers.wait(end - cur);
                cur = Time.currentElapsedTime();
            }
            if (waitingForNewEpoch) {
                // 超时没有过半server建立连接,结束leading状态
                throw new InterruptedException("Timeout while waiting for epoch from quorum");
            }
        }
        return epoch;
    }
}

最新epoch达成一致

经过上述步骤,过半server已经连接至leader。

目前leader的acceptEpoch是所有节点中最大的,但原来leader的currentEpoch和zxid并不一定是最大的。

也可能是其他follower有更大currentEpoch或zxid,选举只需要过半票即可,未必能保证leader拥有最大currentEpoch。

Leader#lead进入第二阶段,这个阶段需要等待过半server同意leader提出的新epoch。

ZooKeeper源码(二)崩溃恢复

Leader发送LEADERINFO

LearnerHandler#run:

1)针对每个follower,leader发送LEADERINFO,包含自己最新的zxid,即epoch<<32 | 0;

2)阻塞等待Follower回复ACKEPOCH;

3)Leader#waitForEpochAck:等待收到过半epochack;

ZooKeeper源码(二)崩溃恢复

Follower回复ACKEPOCH

Learner#registerWithLeader:follower读到LEADERINFO,比较自己的acceptEpoch和leader最新提出的epoch:

1)如果leader大于自己,接受新的epoch,更新acceptEpoch,回复ACKEPOCH;

2)如果leader等于自己,回复ACKEPOCH;

3)如果leader小于自己,抛出异常,结束FOLLOWING状态,进入LOOKING;

ACKEPOCH包中,包含follower目前最大的zxid。

ZooKeeper源码(二)崩溃恢复

过半ACKEPOCH

Leader#waitForEpochAck:等待方式和Leader#getEpochToPropose一致,区别在于这个方法的目的是过半节点同意leader提出的新epoch,如果存在一个follower的epoch/zxid(StateSummary)比自己先前的大,则结束LEADING,进入LOOKING。

ZooKeeper源码(二)崩溃恢复要注意这里的leaderStateSummary,对于leader来说,是提出新epoch之前的最大zxid和acceptEpoch。

ZooKeeper源码(二)崩溃恢复

也就是说,前面Leader#lead一阶段发起新epoch,只是leader接收了所有follower的epoch,并取最大值作为自己的acceptEpoch和zxid,并不代表follower能接受leader的epoch。

在过半ACKEPOCH阶段,leader会意识到自己的原来的epoch或zxid并非集群中的最大值(ss.isMoreRecentThan),只是巧合自己成为leader,那么leader会放弃自己的LEADING状态,抛出异常。

如果遇到这种情况,虽然leader的acceptEpoch增加了,但是实际上currentEpoch还处于原来的大小,没有被实际应用。

ZooKeeper源码(二)崩溃恢复

数据同步

Leader#lead至此阻塞等待收到过半Leader.ACK,在此之前,leader和follower需要完成数据同步。

ZooKeeper源码(二)崩溃恢复

Leader同步数据

回到LearnerHandler#run:当epoch达成一致后,leader主动向follower同步数据。

ZooKeeper源码(二)崩溃恢复

LearnerHandler#syncFollower:针对每个follower当前的zxid,由leader主动发送DIFF/TRUNC/SNAP包给follower。

ZooKeeper源码(二)崩溃恢复

对于Leader,先要确定自己的commitLog水位线。

ZKDatabase#loadDataBase:在加载磁盘数据到内存DataTree的阶段,CommitLog就被加载到内存了。

ZooKeeper源码(二)崩溃恢复

对应实现如下,所有磁盘上的事务数据,都被封装为Proposal提案,放入了commitedLog,用于后续同步给follower,避免了同步要走磁盘io。

其中minCommitedLog是committedLog中最小的zxid,maxCommittedLog是最大的,默认情况commitLogCount=500,内存中只有最新的500条事务Proposal。

ZooKeeper源码(二)崩溃恢复

LearnerHandler#syncFollower:

isPeerNewEpochZxid:

判断follower的低32位是否是0,如果是的话,代表follower只是处于新的一个epoch,还没有发生任何事务;

maxCommittedLog/minCommittedLog:

内存中committedLog的范围,由此可以选择向follower的同步策略,走内存,还是走磁盘;

txnLogSyncEnabled:是否允许走事务日志同步,默认true;

needSnap:决策当前follower是否要走snapshot同步;

ZooKeeper源码(二)崩溃恢复

case1-已经同步

如果follower的zxid和leader的zxid已经一致,发送一个DIFF包,zxid是当前zxid。

ZooKeeper源码(二)崩溃恢复

case2-follower的zxid超出maxCommittedLog

如果follower的zxid超出commitLog,leader发送TRUNC包,zxid是最大committedLog。

ZooKeeper源码(二)崩溃恢复

case3-follower的zxid在commitLog范围内

ZooKeeper源码(二)崩溃恢复

LearnerHandler#queueCommittedProposals:比较follower和leader的zxid情况,从itr迭代器中获取Proposal发送给follower,首先会发送一个OpPacket,即DIFF/TRUNC,接着是一连串的Proposal+COMMIT。

protected long queueCommittedProposals(Iterator<Proposal> itr,
        long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {
    boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
    long queuedZxid = peerLastZxid;
    long prevProposalZxid = -1;
    while (itr.hasNext()) {
        Proposal propose = itr.next();
        long packetZxid = propose.packet.getZxid();
        if ((maxZxid != null) && (packetZxid > maxZxid)) {
            break;
        }
        // skip the proposals the peer already has
        if (packetZxid < peerLastZxid) {
            prevProposalZxid = packetZxid;
            continue;
        }
        if (needOpPacket) {
            if (packetZxid == peerLastZxid) {
                needOpPacket = false;
                continue;
            }

            if (isPeerNewEpochZxid) {
               queueOpPacket(Leader.DIFF, lastCommittedZxid);
               needOpPacket = false;
            } else if (packetZxid > peerLastZxid  ) {
                if (ZxidUtils.getEpochFromZxid(packetZxid) !=
                        ZxidUtils.getEpochFromZxid(peerLastZxid)) {
                    return queuedZxid;
                }
                queueOpPacket(Leader.TRUNC, prevProposalZxid);
                needOpPacket = false;
            }
        }
        if (packetZxid <= queuedZxid) {
            continue;
        }
        queuePacket(propose.packet); // Proposal
        queueOpPacket(Leader.COMMIT, packetZxid); // COMMIT
        queuedZxid = packetZxid;

    }
    if (needOpPacket && isPeerNewEpochZxid) {
        queueOpPacket(Leader.DIFF, lastCommittedZxid);
        needOpPacket = false;
    }
    return queuedZxid;
}    

case4-follower的zxid小于minCommittedLog

leader分成两段同步:

1)磁盘事务日志找到zxid对应事务,从那之后同步到minCommittedLog;

2)内存commitLog,同步minCommittedLog到maxCommittedLog;

如果磁盘中已经找不到zxid对应事务(txnLogItr.hasNext=false),说明follower的zxid在事务日志里已经找不到了,只能走snapshot同步,即needSnap=true

ZooKeeper源码(二)崩溃恢复

case5-snapshot同步

比如follower的zxid小于minCommittedLog,且磁盘上也找不到zxid对应的事务日志,则必须走snapshot同步。

LearnerHandler#run:

ZooKeeper源码(二)崩溃恢复

ZKDatabase#serializeSnapshot:将底层内存中的DataTree序列化后写到输出流。

ZooKeeper源码(二)崩溃恢复

Follower接收数据

DIFF/SNAP/TRUNC

follower首先会收到一个DIFF或SNAP或TRUNC包,代表leader采用什么样的方式,执行后续同步。

对于DIFF包,说明follower落后于leader,follower不需要创建snapshot,只需要在原有snapshot之上继续写;

对于SNAP包,follower直接读leader的snapshot,恢复内存DataTree;

对于TRUNC包,follower删除超出leader的zxid的事务日志;

Learner#syncWithLeader:

ZooKeeper源码(二)崩溃恢复

Proposal+Commit

对于DIFF包情况,follower落后于leader,需要将所有leader的Proposal+Commit包跑一遍和写请求一样的流程,写事务日志+写内存。

对于TRUNC和SNAP情况,follower只需要记录已经提交的zxid。

Learner#syncWithLeader:

ZooKeeper源码(二)崩溃恢复

创建Snapshot

Leader发送一个NEWLEADER给follower。

LearnerHandler#run:

ZooKeeper源码(二)崩溃恢复

Follower收到NEWLEADER,对于TRUNC和SNAP的情况,将内存DataTree创建snapshot,正式接受最新的epoch,并回复Leader一个Leader.ACK。

ZooKeeper源码(二)崩溃恢复

过半Leader.ACK

对于leader。

LearnerHandler#run:当waitForNewLeaderAck收到过半Leader.ack,向follower发送Leader.UPTODATE,并启动底层LeaderZooKeeperServer,开始接受客户端请求。

ZooKeeper源码(二)崩溃恢复

ZooKeeper源码(二)崩溃恢复

对于follower。

Learner#syncWithLeader退出同步逻辑(break outerloop)。

ZooKeeper源码(二)崩溃恢复

启动底层FollowerZooKeeperServer,允许处理客户端请求,并处理snapshot到uptodate间的数据。

ZooKeeper源码(二)崩溃恢复

心跳

当崩溃恢复阶段过去后,Leader和Follower都长时间处于稳定状态。

Leader

Leader#lead:每隔一段时间,向所有follower发送ping。

// ...恢复阶段

// 心跳
boolean tickSkip = true;
while (true) {
    synchronized (this) {
        long start = Time.currentElapsedTime();
        long cur = start;
        long end = start + self.tickTime / 2;
        while (cur < end) {
            wait(end - cur);
            cur = Time.currentElapsedTime();
        }
        if (!tickSkip) {
            self.tick.incrementAndGet();
        }
        // 校验过半follower的仍然活着tickOfNextAckDeadline
        SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
        syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
        syncedAckSet.addAck(self.getId());
        for (LearnerHandler f : getLearners()) {
            if (f.synced()) { // 校验逻辑
                syncedAckSet.addAck(f.getSid());
            }
        }

        if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
            // 如果没有收到过半follower的存活通知
            shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
                    + syncedAckSet.ackSetsToString() + " ]";
            break;
        }
        tickSkip = !tickSkip;
    }
    for (LearnerHandler f : getLearners()) {
        f.ping();
    }
}
if (shutdownMessage != null) {
    shutdown(shutdownMessage);
}

在发送ping之前,leader会校验与过半follower是否还能正常通讯。

LearnerHandler#synced:当前线程存活 且 tick<=tickOfNextAckDeadline。

ZooKeeper源码(二)崩溃恢复

tick在发送心跳前(Leader#lead)会增加。

而tickOfNextAckDeadline在收到follower的任意数据包后更新。

所以只要follower能正常处理leader的ping,或者其他proposal或者commit,就能在synced检查时返回true。

LearnerHandler#run:

ZooKeeper源码(二)崩溃恢复

LearnerHandler#ping:先检查follower是否在指定时间内返回过对于一个Proposal的ACK。

如果没有,则关闭LearnerHandler;如果有,继续发送ping给follower。

ZooKeeper源码(二)崩溃恢复

Follower

Follower#followLeader:处理Leader发来的数据包,其中包含ping、proposal、commit等等。

ZooKeeper源码(二)崩溃恢复

ZooKeeper源码(二)崩溃恢复

Learner#ping:Follower收到Leader的ping之后,会回复一个ping,把自己的session相关数据同步给leader。

ZooKeeper源码(二)崩溃恢复

那么Follower如何判断到Leader下线呢?

这个主要是通过通讯层的异常触发。

比如leader连接断开:

ZooKeeper源码(二)崩溃恢复比如我在leader发送ping前打个断点,follower读socket超时:

ZooKeeper源码(二)崩溃恢复

这个超时在哪设置的呢?

Learner#connectToLeader->Learner#createSocket:在follower连接leader的quorum端口时设置的。

ZooKeeper源码(二)崩溃恢复

其实leader也会对follower的socket做同样的超时处理:

ZooKeeper源码(二)崩溃恢复

总结

选主

ZooKeeper源码(二)崩溃恢复

当集群中节点都处于LOOKING状态,选主流程大致如下:

1)广播自己的选票

一开始会选自己,如果没选出主且没有收到其他人的选票,会偶尔广播一下。

2)接收对端的选票(包含自己)

3)比选举轮次

  • 比自己大,更新自己的选举轮次=对方选举轮次
  • 和自己一样大,ok
  • 比自己小,忽略这张选票,回复对端自己的选票,让对方进入最新的选举轮次

4)比zxid和epoch,更新自己的选票,并广播选票

目的是选数据最全的节点做leader

优先选epoch最大的,其次选zxid最大的,再次选sid(myid/serverid)最大的

5)判断是否满足quorum

默认quorum条件就是过半,如果一个节点的选票超过一半,则结束looking

6)leader or follower

如果最终获得过半选票的节点的sid是自己,则自己成为leader,否则自己成为follower

恢复

选主完毕,节点都处于FOLLOWING或LEADING状态。

此时每个节点的数据可能都不相同。

确定最新epoch和zxid

ZooKeeper源码(二)崩溃恢复

在进行数据同步之前,必须要确定最新的epoch和zxid。

Step1:过半Follower与Leader建立连接

1)Leader:加载磁盘数据到内存

2)Leader:创建LearnerCnxAcceptor接收follower连接

3)Follower:与Leader建立连接,发送FOLLOWERINFO,包含自己的sid和acceptEpoch

4)Leader:对于每个Follower创建一个LearnerHandler负责与之通讯

5)Leader:阻塞等待过半FOLLOWERINFO,或超时进入LOOKING

6)Leader:用收到的最大的acceptEpoch+1作为最新的epoch,并初始化zxid=新epoch<<32 | 0

Step2:过半节点对于最新epoch达成一致

1)Leader:发送LEADERINFO给所有Follower,包含自己最新的zxid,即epoch<<32 | 0;

2)Follower:接收LEADERINFO,检查最新的zxid是否epoch大于等于自己的acceptEpoch;

如果校验通过,回复Leader一个ACKEPOCH(包含自己的最大zxid),并设置acceptEpoch=最新epoch;

如果校验不通过,不会回复ACKEPOCH,抛出异常结束FOLLOWING状态,进入LOOKING;

3)Leader:收到ACKEPOCH,判断自己先前的StateSummary(epoch/zxid),是否大于Follower的epoch和zxid,如果校验不通过,抛出异常,结束LEADING,进入LOOKING

4)Leader:阻塞等待过半ACKEPOCH,或超时进入LOOKING

数据同步

ZooKeeper源码(二)崩溃恢复

从leader的视角来看,只要过半节点与自己数据一致,就可以开启底层ZooKeeperServer,允许与客户端通讯。

Leader#lead:

ZooKeeper源码(二)崩溃恢复

Leader针对每个Follower的LearnerHandler处理时序如下:

1)拿到ACKEPOCH里返回的follower的zxid

2)比较zxid,如果zxid与自己一致,代表数据已经同步,走4)

3)zxid与自己不一致,比较zxid是否在commitLog里,采用不同的策略同步数据:

a)zxid大于maxCommitLog

发送TRUNC,期望follower截断数据

b)zxid在commitLog里

follower落后不太多,用内存CommitLog同步数据

c)zxid小于minCommitLog,磁盘事务日志中存在zxid对应事务

用磁盘事务日志+内存CommitLog同步数据

d)zxid小于minCommitLog,磁盘事务日志中找不到zxid对应数据

必须走SNAP快照同步

4)已经将该同步的数据都写给follower了,再发送个NEWLEADER给follower,里面的zxid是新epoch<<32|0

5)阻塞等待过半节点返回ACK

ZooKeeper源码(二)崩溃恢复

如果过半节点返回ACK,代表过半节点已经接受了最新epoch,且数据已经与leader一致,可以对外提供服务。

1)Leader收到过半ACK后,从阻塞中恢复

2)Leader发送给UPTODATE给Follower,启动ZooKeeperServer可以对外提供服务

3)Follower收到UPTODATE,启动ZooKeeperServer可以对外提供服务

心跳

崩溃恢复完成后,集群处于一个稳定状态。

Leader会定时主动向所有Follower发送PING,Follower会回复Leader一个PING。

Leader如何检测Follower下线?

1)通过网络通讯异常

2)长时间未收到Follower的任何数据包

Follower如何检测Leader下线?

通过网络通讯异常

转载自:https://juejin.cn/post/7203138683226505272
评论
请登录