likes
comments
collection
share

Raft 算法选主详解与复现, 完成 MIT 6.824(6.5840) Lab2A

作者站长头像
站长
· 阅读数 31

什么是 选主(Leader Election)?

在 raft 算法中, 分布式系统的节点被分成三类, leader, follower, candidate. 其中 leader 具有一系列重要功能: 包括但不限于:

  1. 日志复制:Leader 负责接收客户端的请求,并将其转化为日志条目。Leader 将这些日志条目复制到其他节点(称为 Followers)上,以实现状态的一致性。Leader 维护着最新的日志副本,并负责将其复制到其他节点。 注意这里的日志和一般意义上的日志不同。 一般意义中的日志是指对运行状态的关键信息做出记录(比如遇到数据库查询错误,grpc连接错误时对现成进行记录而生成的文件, 主要用于排查 bug, 排查线上问题)。 而 raft 算法中的日志是指 leader 对其他节点的"发号施令", leader 对其他 节点下达的一起请求都称为日志。

  2. 一致性决策:在 Raft 中,Leader 负责决定哪些日志条目被接受并追加到日志中。当客户端发送请求时,Leader 会处理该请求,并在日志中追加相应的条目。Leader 的决策是最终的,其他节点必须遵循 Leader 的指导。

  3. 客户端交互:在分布式系统中的所有节点中,只有 Leader 是作为唯一与客户端直接交互,处理客户端的读写请求并返回结果。客户端向 Leader 发送请求,Leader 处理请求后将结果返回给客户端。这简化了客户端与集群的交互,使得整个系统更易于使用。

其他两个角色中,follower 作为 leader 的追随者, 处处听从 leader 指挥(前提是leader确实是有资格,合法的leader, 后面我会详解什么情况下leader不合法)。当 follower 找不到 leader, 或者认为现在的 leader 已不再够资格时 就会转化成 candidate, 然后竞选成为 下一代(raft 中成为下一个 term, 或者理解为任期) leader。candidate 竞选成为 leader的这一过程就成为 选主 (Leader Election).

raft 算法中的leader是有任期的, 任期也成为 term。 一个 term 内有且只有一个 leader。 term在实现中是一个单调递增的变量。每到一个新任期, term 都会+1。后面我们会说到并不是每一个 term 都能选出 leader的。但是, 好的一致性算法应当保证尽可能每一个 term 都能选出leader。 试想如果经常选不出leader, 分布式系统中的节点将处于一个群龙无首的状态, 工作效率将会大打折扣。

正是因为 leader 在分布式系统中的重要性,选主的也成为了各类一致性算法的核心。

leader, follower, candidate 之间的转化关系

上面说完了 leader, follower, candidate 三个角色的分工, 下面详解三者之间如何转换。下图是 raft 算法中的转换图:

Raft 算法选主详解与复现, 完成 MIT 6.824(6.5840) Lab2A

按照转化图, 我们可以简单翻译一下 各个 角色之间如何转换:

  1. 一开始,大家都是 follower
  2. 当 follower 发现选举超时(后面详解什么是选举超时),将会变成 candidate
  3. candidate 向所有节点发起投票, 如果超过半数节点投票给自己, 自己将会成为 leader
  4. candidate 向所有节点发起投票, 如果超过半数节点没有投票给自己,自己仍是 candidate, 等到选举超时后再次发起投票
  5. candidate 如果发现有其他节点的 term 比自己大(原图中的 discovers curent leader 也是通过 term 来判断的后面会说到),则主动让贤,成为 follower
  6. leader 发现其他节点 的 term 更大, 则主动让贤

选主过程 详解

看完上面的转化图,详信你应该对 raft 的选主有个粗略的掌握了。之所以是粗略,那是因为实际中的选主远比上面的转化图要复杂得多。有许多边界情况,许多容错处理的情况, 以及选主的一致性保障,上面的图是没有办法覆盖的。下面我将尽可能由浅入深地讲解raft算法中的角色转换细节, 以及为什么这样转换

一致性保障

所谓一致性,就是尽量确保同一时刻有且仅有一个合法的 leader, 不能多个节点跳出来都声称自己是leader 。为此, raft 算法主要从以下多个维度来实现:

成为 leader的条件尽可能的 苛刻

raft采取投票的方式进行选主, 只有当一个 节点获得超过半数以上的节点投票给自己时, 自己才会宣布成为 leader。 如此一来,一旦选出了 leader, 即使有其他节点"起兵造反" 也会因为最多只能拿到小半数的投票 而老老实实让贤。

leader 通过心跳机制维护自己的 leader 身份

leader 一旦成为 leader 以后, 会规律地,定时地向其他节点发送心跳, 其他节点在收到leader心跳后, 会做出回应, 这里的回应可以是服从 leader, 也可以是反对leader

leader不会主动让位

为了尽可能维持系统的稳定性,不至于经常群龙无首, leader的身份应该尽可能地不受干扰。一个 leader 有且只有两种情况下才会被迫退出 leader 回到 follower, 分别是:

  1. 发现超过半数的 节点没有回应。 我们说过 raft的leader 有效性必须得到超过半数节点的同意。如果leader发送心跳时,超过半数的节点没有回应,则此 leader的合法性应该立刻失效。这里的不回应可能是其他节点挂掉了, 也有可能是 leader自己连不上其他节点。无论哪一种情况,都应该主动退出 leader
  2. 发现其他节点的 term 更大。term 表示 节点的任期, term 越大,代表这个 节点的角色越新,越有话语权。 在一个稳定运行的分布式系统中,所有节点的 term 应该保持一致(低term的节点会被leader更新成同样的term), 也就是大家处于同一个阶段, 大家都听从同一个 leader。但是当一个节点迟迟没有收到leader心跳时,就会变成candidate , 此时term 就会加1,此时这个 candidate 向leader 请求投票时, leader 发现 candidate 的 term 更大就会让贤。 之所以 term 会 加一就是为了标识自己 进入了选主阶段,请求大家投票给自己。 或者在 leader 发送心跳时,如果收到 follower 的term 更大 也会让贤。可能你会好奇,follower回复的term为什么会比leader大?其实这也是有可能的,比如leader发送的心跳在有一段时间内到不了follower,follower等不及了,已经过了超时选举的时间,于是成为了candidate。但过了一会网络又通了,leade又发送给follower(leader以为它是follower,但其实已经成了candidate)心跳, 这个candidate将自己的term返回去,leader发现follower(实际上已经变成了candidate)的term比自己更大,于是leader让贤。或者老leader自己断网了,自己的心跳送不出去,其他follower已经选出了新leader,老leader收到了新leader的心跳,发现term更大,于是主动让贤。

leader 只会变成 follower, 而不会成为 candidate

上面说过稳定的系统中,各个节点的 term 应该保持相同。除非 leader 不再合法,有节点跳出来成为 candidate, 既然有新的 candida 跳出来, leader就不能成为下一个 candidate了, 否则就会出现多个 candidate 打架的场面, 你不服我,我也不服你,大家耗着时间选不出来。

鲁棒性保障

所谓 鲁棒性,就是确保即使一小部分(不过半)的节点挂了(包括leader挂了的情况)以后,剩余的节点依旧可以选出合法的 leader来(某些term内可能存在选不出来,但长期来看必然选得出来)。 raft 算法主要通过以下多种方式实现:

超时选举

上面说过 leader的维持需要大多数节点的投票,如果leader心跳过程中没有收到超过半数的节点的同意,那么leader失去合法性。 如果其他节点很长一段时间没有收到leader的心跳的,那么它就会变成 candidate请求大家投票,成为新一个 term的 leader。为了记录有多久没有收到合法leader的心跳,此处应该有一个 timer, 如果 timer 触发了, 则说明很久都没有leader了,此时应该立刻成为 candidate 选主。

事实上这个 timer 不仅仅是记录有多久没有收到 leader 心跳的意义。它应该更通用,应该用来看看过了多久时间是不是应该进行选主了。 如此一来。这个 timer 还可以在以下三个场景都应该被置零

  1. 收到 合法 leader的心跳时重置一下
  2. 成为 candidate 时 重置一下, 否则当选举失败后, 当前candidate 还会继续选主,如果多个 candidate 存在,则会互相竞争,导致长期选不出leader.
  3. 在投票给 candidate 时重置一下,代表我已经投票给你了,即使没有收到leader心跳我也不会选主, 而是全心全意支持你

一个 term 内, 节点只能投票个一个 candidate

上面说过 term的意义是表示节点自己的地位。为了避免一个节点在一个 term 内反复投票,造成所有 candidate都获得同样的选票,raft 规定了一个 term 内只能投一张选票。

随机化的 超时选举

试想一下在某个时刻,leader挂掉了,每一个节点都过了300mS没有收到心跳,于是一起term+1,一起变成candidate,一起发起投票,那么将会出现非常混乱的局面,很有大家都没有收到超过半数的投票,从而选不出leader。为了避免大量节点在短时间内同时触发超时选举,所以超时选举的时间应该随机化。

心跳的间隔应该<< 超时选举间隔

如果心跳时间与超时选举时间相当,那么我们可以想象当网络波动造成心跳间隔变长,那么将会立刻触发不必要的选主,造成资源的浪费。 所以正确的心跳间隔时间应该是明显小于 超时选举的间隔的, 避免一旦心跳延期就立刻选主的问题。在 raft 论文中,心跳间隔是 10mS, 超时选举为 150mS-300mS 之间。

源码详解 (6.824/src/raft/raft.go)

raft 节点成员属性

raft 结构体有如下几个成员属性:

  1. mu, 锁, 用于操作成员变量时使用

  2. peers, 可以理解为操作其他节点的rpc client集合

  3. persister, 选主任务暂时无需理解

  4. me, 自身节点在 peers 中的索引

  5. dead, 用于设置当前节点是否存活

  6. role, 标记当前节点的角色, folloer, candidate 还是 leader

  7. term, 任期号, 一个任期只能有一个 leader, 节点在一个任期内只能投票给一个 candidate

  8. voteFor, 标记当前节点投票给谁

  9. heartBeatInterval, 心跳超时时间, 固定值

  10. electionTimer, 选举超时定时器

  11. heartbeatTimer, 心跳超时定时器

const (
   Leader    Role = "Leader"
   Follower  Role = "Follower"
   Candidate Role = "Candidate "
)

 

// A Go object implementing a single Raft peer.
type Raft struct {
   mu        sync.Mutex          // Lock to protect shared access to this peer's state
   peers     []*labrpc.ClientEnd // RPC end points of all peers
   persister *Persister          // Object to hold this peer's persisted state
   me        int                 // this peer's index into peers[]
   dead      int32               // set by Kill()

   // Your data here (2A, 2B, 2C).
   // Look at the paper's Figure 2 for a description of what
   // state a Raft server must maintain.

   role    Role // leader/follower/candidate
   term    int  // current term index
   voteFor int  // vote for peer's index

 
heartBeatInterval time.Duration // heartbeat internal, fixed value (10ms)

electionTimer   *time.Timer // timer of election timeout, random value
heartbeatsTimer *time.Timer // timer of heartbeat timeout, fixed value (10ms)
 
}

成员属性初始化

一开始,所有成员都是 follower, term 均为0, voteFor设为-1表示不给任何节点投票, heartBeatInterval 心跳间隔设为10ms, 超时选举timer随机化触发, 心跳timer固定10ms触发

func Make(peers []*labrpc.ClientEnd, me int,
   persister *Persister, applyCh chan ApplyMsg) *Raft {
   rf := &Raft{}
   rf.peers = peers
   rf.persister = persister
   rf.me = me

   // Your initialization code here (2A, 2B, 2C).
   rf.role = Follower
   rf.term = 0
   rf.voteFor = -1
   rf.heartBeatInterval = 10 * time.Millisecond

   rf.electionTimer = time.NewTimer(rf.randomTimeout())
   rf.heartbeatTimer = time.NewTimer(rf.heartBeatInterval)

   // initialize from state persisted before a crash
   rf.readPersist(persister.ReadRaftState())

   // start ticker goroutine to start elections
   go rf.ticker()

   return rf
}

一些辅助函数

GetState 返回 节点角色, randomTimeout 返回随机的选主超时时间,


func (rf *Raft) GetState() (int, bool) {

   var term int
   var isleader bool
   // Your code here (2A).
   rf.mu.Lock()
   term = rf.term
   if rf.role == Leader {
      isleader = true
   } else {
      isleader = false
   }
   rf.mu.Unlock()
   return term, isleader
}


// random election timeout (150-300ms)
func (rf *Raft) randomTimeout() time.Duration {
   return time.Duration(150+rand.Int31n(150)) * time.Millisecond
}
 

raft 节点主逻辑

raft 节点主要循环做两件事情:

  1. 一旦触发超时选主timer, 立刻变成candidate选主
  2. 一旦触发心跳timer,立刻发送心跳

func (rf *Raft) ticker() {
   for rf.killed() == false {

      // Your code here to check if a leader election should
      // be started and to randomize sleeping time using
      // time.Sleep().

      select {
      case <-rf.electionTimer.C:
         rf.ElectLeader()
      case <-rf.heartbeatTimer.C:
         rf.HearBeat()
      }

   }
}

超时选主

ElectLeader 作为选主的主要逻辑入口, 一进来就判断角色, 只有 Candidate和 follower 才能进行选主。随后立即成为 candidate,重置选主超时 timer, 自己投票给自己,然后使用go routine 向所有节点发送 投票请求。之所以使用 go routine 是为了尽可能地减少阻塞时间,避免某一个节点来不及响应而导致时间浪费,迟迟选不出leader。特别地是成为 leader的条件, 如果发现投票数超过大多数节点数 且仍为 candidate ,则立即成为 leader 并发送心跳,而无需等待其他 go routine 完成。

func (rf *Raft) ElectLeader() {

   rf.mu.Lock()
   if !(rf.role == Candidate || rf.role == Follower) {
      rf.mu.Unlock()
      return
   }

   // become candidate
   rf.electionTimer = time.NewTimer(rf.randomTimeout())
   rf.role = Candidate
   rf.term++
   rf.voteFor = rf.me
   voteReceived := 1
   req := RequestVoteArgs{
      CandidateId: rf.me,
      Term:        rf.term,
   }
   rf.mu.Unlock()

   // request all nodes for voting me
   for i := 0; i < len(rf.peers); i++ {

      if i == rf.me {
         continue
      }

      rf.mu.Lock()
      if rf.role != Candidate {
         rf.mu.Unlock()
         break
      }
      rf.mu.Unlock()

      go func(i int) {

         reply := RequestVoteReply{}

         // if this peer doesn't response, continue request next peer
         ok := rf.sendRequestVote(i, &req, &reply)
         if !ok {
            return
         }

         rf.mu.Lock()
         if reply.VoteGranted {
            voteReceived++
            // if most of peers vote me, I will be the leader
            if voteReceived > len(rf.peers)/2 && rf.role == Candidate {
               rf.role = Leader
               rf.voteFor = -1
               rf.mu.Unlock()
               go rf.HearBeat()
               return
            }
            rf.mu.Unlock()
         } else if reply.Term > rf.term { // if find other peer's term greater than me, become follower immediately
            rf.role = Follower
            rf.term = reply.Term
            rf.voteFor = -1
            rf.mu.Unlock()
            return
         } else {
            rf.mu.Unlock()
         }

      }(i)

   }

}

RequestVote 作为响应投票请求的函数, 逻辑也非常简单。只有在 请求节点的 term 大于自己的 term 或者 term 相等且没有投过票时才会投票(重置超时选主 timer,切换角色,标记投票给谁),否则拒绝投票。


func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
   // Your code here (2A, 2B).

   rf.mu.Lock()
   if args.Term > rf.term || (args.Term == rf.term && rf.voteFor == -1) {
      rf.role = Follower
      rf.term = args.Term
      rf.voteFor = args.CandidateId
      rf.electionTimer = time.NewTimer(rf.randomTimeout())
      reply.VoteGranted = true
      reply.Term = rf.term
      rf.mu.Unlock()
      return
   } else {
      reply.VoteGranted = false
      reply.Term = rf.term
      rf.mu.Unlock()
      return
   }

}

发送心跳

HeartBeat 函数作为发送心跳的主逻辑,一进来就对 节点角色进行判断,若不为 leader 则立即退出。随后重置 心跳 timer, 向所有节点发送心跳。一旦有节点的 term 更大,则立即让贤变成 follower。特别的是,用了一个 heartBeatNoResp 来记录多少节点没有响应,如果超半数节点没有响应,则应该立即退出 leader。这里必须用 waitGroup 进行等候,否则则可能go routine 因为网络被阻塞了而来不及结束,而 HearBeat 却早早结束了,导致非法的leader没有让出位置。

func (rf *Raft) HearBeat() {

   for !rf.killed() {
      // once the peer is dead or it's not leader, the peer will stop sending heartbeat to others
      rf.mu.Lock()
      if rf.role != Leader {
         rf.mu.Unlock()
         return
      }
      rf.heartbeatTimer.Reset(rf.heartBeatInterval)
      args := AppendEntriesArgs{Term: rf.me}

      rf.mu.Unlock()

      wg := sync.WaitGroup{}
      hearBeatNoResp := 0

      // sending heartbeat to other peers
      for i := 0; i < len(rf.peers); i++ {
         if i == rf.me {
            rf.mu.Lock()
            rf.electionTimer = time.NewTimer(rf.randomTimeout())
            rf.mu.Unlock()
            continue
         }

         // once the peer is dead or it's not leader, the peer will stop sending heartbeat to others
         rf.mu.Lock()
         if rf.role != Leader {
            rf.mu.Unlock()
            break
         }
         rf.mu.Unlock()

         wg.Add(1)

         go func(i int) {
            defer wg.Done()
            reply := AppendEntriesReply{}
            ok := rf.peers[i].Call("Raft.AppendEntries", &args, &reply)
            if !ok {
               rf.mu.Lock()
               hearBeatNoResp++
               rf.mu.Unlock()
               return
            }

            // if find other peer's term greater than me, become follower immediately
            rf.mu.Lock()
            if reply.Term > rf.term {
               rf.term = reply.Term
               rf.role = Follower
               rf.voteFor = -1
            }
            rf.mu.Unlock()
         }(i)

      }

      wg.Wait()
      rf.mu.Lock()
      if hearBeatNoResp > len(rf.peers)/2 && rf.role == Leader {
         rf.role = Follower
         rf.voteFor = -1
         rf.mu.Unlock()
         return
      }
      rf.mu.Unlock()
   }
}

AppendEntries 作为响应leader心跳的函数会对 leader合法性进行校验,若term更小则不会更新自己的选主超时 timer,也就是不接受此次心跳。同时也会把自己的term返回回去,若leader发现别的节点term更大则会退出leader。

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {

   rf.mu.Lock()
   reply.Term = rf.term
   // only update my election timer when it's a valid leader
   if args.Term >= rf.term {
      rf.electionTimer = time.NewTimer(rf.randomTimeout())
   }
   rf.mu.Unlock()
}
 

测试函数解读

lab2A 有三个测试用例, 全部位于 6.824/src/raft/test_test.go 中, 分别是:

  1. TestInitialElection2A,最简单的测试用例,初始化3个节点,一段时间后查看是否有唯一的leader选出,term是否一致。
  2. TestReElection2A, 稍微复杂的测试用例,初始化3个节点,随后查看是否有唯一的leader选出, 若选出leader则把leader给 disconnect掉,查看能否继续选出leader,随后再将 disconnect的leader重新上线,再次查看是否有唯一的leader被选出。
  3. TestManyElections2A, 最复杂的测试用例,初始化7个节点,10次循环。每个循环内会查看是否有唯一的leader选出,再随机disconnect掉3个节点,看看能否选出唯一的leader,随后再把3个节点上线,如此反复。

在6.824/src/raft/目录下运行以下命令就能测试选主了

 go test -run 2A

踩坑经验

目前为止,已经完全可以实现lab2a要求,回想我实现起来踩过不少的坑,一些踩坑经验和 debug 经验分享给大家:

  1. 一定要使用定时器来进行选主超时判断和心跳超时判断,不能用一个时间戳来记录,不能用 time.sleep 来规律性地发送心跳!!! 这是我踩过最大的坑。原因很简单,超时选主的主逻辑耗时不止300ms, 如果使用时间戳来进行超时判断,则会存在一个频繁超时,不停地选主的情况。同理,心跳的主逻辑也不止300ms, 如果用 time.sleep(10ms)也远远达不到心跳的频率,会导致节点迟迟收不到心跳而不停地进行选主,或者选出来的主因为发送心跳不及时而草草退出。

  2. 严格按照论文中的示意图进行角色转换,不能模棱两可。比如进入选主时,只能是follower和candidate才有资格,leader是不可以的, leader只能退化成 follower而不能直接进行选主。

  3. 尽可能把每一次角色转换的条件变得苛刻。比如若超过半数节点未响应leader,则应该立刻退出leader;发送心跳时若不为leader则应该立刻停止;接受心跳返回时,若发现follower心跳更大,应该立刻退出leader。

  4. debug 时,建议直接 fmt.print 直接打印关键信息,需要胆大心细,观察是什么用例不过,是选不出主子,还是选出了多个主?比如选不出主,则可以打印下每一个节点收到的投票情况,看看节点是否有返回。 或者看看某个节点相邻的心跳时间为多少,是不是一直超时导致频繁选主。若有多个leader,则打印一下退出leader的条件是否触发,比如没有收到哪一个节点的响应。记住,debug信息要始终围绕,选主超时,心跳超时,角色转换的时机而展开

巨人的肩膀

  1. In Search of an Understandable Consensus Algorithm | USENIX
  2. thesquareplanet.com/blog/studen…
  3. nil.csail.mit.edu/6.824/2020/…
转载自:https://juejin.cn/post/7286133122211037244
评论
请登录