likes
comments
collection
share

用go实现Raft(4) - 成员变更篇

作者站长头像
站长
· 阅读数 52

用go实现Raft(4) - 成员变更篇


一个分布式集群在运行过程中不可避免的会出现节点故障,从而需要能够变更集群节点。

  • 一种方法是关闭集群以新的配置启动集群,而这会导致集群在成员变更期间不可用;
  • 也可以使新节点获得旧节点的IP地址,从而取代节点,这需要保证被替代的节点永远不会再启动;

上述两种办法都有缺陷,人工操作也会有操作错误产生,在raft中加入了成员变更自动化将其作为算法的一部分。

为保证安全性,在成员变更期间不能出现在同一个任期选举两个leader,在一次新增或移除多个节点时,集群中不可能一次性切换所有节点到新状态,过度期间集群可能分裂(下图在有3个节点的集群添加两个新节点,在某个时间CnewC_{new}CnewColdC_{old}Cold可能选取出不同leader),我们需要额外的机制来处理这个问题。

用go实现Raft(4) - 成员变更篇

Raft原始论文采用两阶段更新配置,让节点在不同配置间过度,不影响安全使用,并在过度期间提供服务:

  • 首先进入一个过度阶段,称为联合共识,联合共识阶段集群中同时存在两种配置;
    • 日志会被复制到两种配置的节点
    • 新旧配置的节点都能作为leader
    • 共识(选举、日志)需要同时由两种配置的大多数节点达成
  • 当联合共识被提交,集群已过度到新配置;

Raft集群配置使用特殊的日志来保存和通信,这样可以使用已存在的共识机制使得新配置在集群达成共识

  • leader收到成员变更请求(将集群配置从ColdC_{old}Cold更新到CnewC_{new}Cnew),leader将存储配置用于联合共识Cold,newC_{old,new}Cold,new,并将其作为日志同步到集群;
  • follower收到Cold,newC_{old,new}Cold,new日志后,节点将应用配置Cold,newC_{old,new}Cold,new无需等待日志提交,日志提交/leader选举需得到Cold,newC_{old,new}Cold,new中大多数同意,在这期间CnewC_{new}Cnew不能独立决策;
    • 如leader崩溃,新的leader将在ColdC_{old}ColdCold,newC_{old,new}Cold,new中选出;
  • Cold,newC_{old,new}Cold,new日志提交后,ColdC_{old}ColdCnewC_{new}Cnew都能独立进行决策,添加一条新日志CnewC_{new}Cnew
    • 此时leader崩溃也能选举出含有Cold,newC_{old,new}Cold,new日志的新leader,故leader可以安全的添加CnewC_{new}Cnew日志;
  • follower收到CnewC_{new}Cnew日志同样立即生效;
  • CnewC_{new}Cnew日志提交后,不在新集群的节点关机;

用go实现Raft(4) - 成员变更篇

在新的论文中提出了一种更简单的办法,限制raft变化类型,每次只允许添加/删除一个节点,更复杂的变更由一系列单节点变更构成,当每次仅改变一个节点时,旧集群与新集群的大多数是重叠的,这种重叠可以防止集群分裂。

用go实现Raft(4) - 成员变更篇

单节点变更过程如下:

  • eader收到成员变更请求(将集群配置从ColdC_{old}Cold更新到CnewC_{new}Cnew),leader将存储配置CnewC_{new}Cnew,并将其作为日志同步到集群;
  • follower收到CnewC_{new}Cnew日志后,节点将应用配置CnewC_{new}Cnew无需等待日志提交,CnewC_{new}Cnew的大多数用于决定CnewC_{new}Cnew日志的提交;
    • 如需要日志提交后才应用配置,这会导致leader难以得知集群节点是否已应用新配置,需要额外的机制去追踪节点变更状况,这样才能继续后续的变更,而节点收到CnewC_{new}Cnew日志直接采用时,leader可以CnewC_{new}Cnew日志提交时判断集群大多数已切换到新状态,可以安全的进行下一次变更。
  • CnewC_{new}Cnew提交后成员变更完成,此时可以继续下述内容:
    • leader确认成员变更成功完成;
    • 一个节点从集群中移除,该节点可以关闭;
    • 可以开始下一次的成员变更,同时处理多条成员变更日志等价与一次变更多个节点,会导致集群不安全;

raft中达成共识使用调用者的配置,接收方处理RPC请求时不关心来源是否在接收方的集群配置中。

  • 新节点加入集群时,无任何日志,无法从日志中取得集群配置,如不处理集群外RPC请求,将导致该节点无法处理任何RPC请求,永远无法加入集群;
  • 节点也需要投票给不属于集群的canidate(canidate日志、任期足够新时),以在某些情况下保证集群可用;

在变更成员时,还有三个问题需要解决:

  • 新节点可能没有存储任何日志,集群需要一定时间将日志同步到新节点,在此期间节点如新节点能够在日志同步中投票,可能导致集群在这段时间无法提交新的日志。
    • 故新节点在加入集群时不参与日志投票,直到节点已同步到最新日志
  • 当前leader可能不是新集群的一部分。
    • 原始论文中leader需要在CnewC_{new}Cnew日志提交后下台成为follower,此时出现了非集群节点的leader暂时管理集群;
    • 新论文中中加入了leader转移,在成员变更之前,将leader转移到集群中其他节点;
  • 被移除的节点在没有收到心跳后可能开始选取,扰乱集群运行。
    • 节点在认为领导者存在时需要无视RequestVote请求(即节点在得到当前leader消息后的最短选举时间内收到RequestVote的请求);

联合共识实现

首先添加新的日志类型,用来表示成员变更,成员变更分为:添加节点、移除节点,变更只需要知道节点的编号和地址。

enum EntryType {
  NORMAL = 0;
  MEMBER_CHNAGE = 1;
}

enum MemberChangeType {
  ADD_NODE = 0;
  REMOVE_NODE = 1;
}

message MemberChange {
  MemberChangeType type = 1;
  uint64 id = 2;
  string address = 3;
}

message MemberChangeCol { repeated MemberChange changes = 1; }

当前的实现中raft分为三层:

  • raft server,提供与客户端、其他节点的RPC通信
  • raft node,连接raft与raft server,通过通道将数据读写顺序化,避免产生数据竞争
  • raft,实现协议逻辑

在raft、raft server中都保存着集群信息,raft server中保存集群各节点完整信息,raft中保存集群各节点编号,于是成员变更实际按以下过程来完成:

  • 接收到Cold,newC_{old,new}Cold,new -> raft&raft server中加入新集群配置
  • Cold,newC_{old,new}Cold,new提交 -> leader发送CnewC_{new}Cnew
  • 接收到CnewC_{new}Cnew -> raft&raft server中移除旧集群配置
  • CnewC_{new}Cnew提交 -> 关闭旧节点

将第一阶段leader发送日志Cold,newC_{old,new}Cold,new定义为含有内容的变更日志,第二阶段leader发送日志CnewC_{new}Cnew定义为内容为空的变更日志。

实现raft中成员变更:

  • 将raft中集群信息分为新旧两个部分(incoming为当前集群,outcoming为集群),进行联合共识时会同时存在两种配置,集群共识需要两种配置的集群都达到共识;
  • 第一阶段时先将当前集群信息incoming复制到旧集群信息outcoming,再将成员变更内容更新到当前集群信息incoming;
  • 第二阶段时清除旧集群信息outcoming,如某个节点被移除同时清理对应同步记录;
type Cluster struct {
	incoming           map[uint64]struct{} // 当前/新集群节点
	outcoming          map[uint64]struct{} // 旧集群节点
	pendingChangeIndex uint64              // 未完成变更日志
	inJoint            bool                // 是否正在进行联合共识
	...
}

func (c *Cluster) ApplyChange(changes []*pb.MemberChange) error {
	 if len(changes) == 0 {
		// 成员变更数为0,当前变更为阶段2,清空旧集群数据
		c.outcoming = make(map[uint64]struct{})
		for k := range c.progress {
			_, exsit := c.incoming[k]
			if !exsit {
				delete(c.progress, k)
			}
		}
		c.inJoint = false
		c.logger.Debugf("清理旧集群信息完成, 当前集群成员数量: %d", len(c.incoming))
		return nil
	}
	// 转移集群数据到outcoming
	for k, v := range c.incoming {
		c.outcoming[k] = v
	}

	// 按变更更新成员
	for _, change := range changes {
		if change.Type == pb.MemberChangeType_ADD_NODE {
			c.progress[change.Id] = &ReplicaProgress{
				MatchIndex: 0,
				NextIndex:  1,
			}
			c.incoming[change.Id] = struct{}{}
			c.logger.Debugf("添加集群成员: %s ,新集群成员数量: %d", strconv.FormatUint(change.Id, 16), len(c.incoming))
		} else if change.Type == pb.MemberChangeType_REMOVE_NODE {
			delete(c.incoming, change.Id)
			c.logger.Debugf("移除集群成员: %s ,新集群成员数量: %d", strconv.FormatUint(change.Id, 16), len(c.incoming))
		}
	}
	return nil
}

在raft node中添加应用配置方法。

  • 添加一个新的通道用来应用配置,raft实现中无锁,变更配置需要与其他的处理一起顺序执行,防止数据竞争;
  • 在主循环中加入成员变更通道等待,从通道取得变更内容后,调用raft中应用变更方法执行变更;
type RaftNode struct {
	···
	changec    chan []*pb.MemberChange // 变更接收通道
	···
}

func (n *RaftNode) ApplyChange(changes []*pb.MemberChange) {
	n.changec <- changes
}

func (n *RaftNode) Start() {
	go func() {
		...
		for {
			...
			select {
			...
			case changes := <-n.changec:
				n.raft.ApplyChange(changes)
			case <-n.stopc:
				return
			}
		}
	}()
}

实现raft server中成员变更:

  • 检查当前配置与变更请求的差异,判断是否需要进行变更,如当前集群配置与变更后一致,则无需后续处理;
  • 依据日志内容判断为变更的哪个阶段:
    • 在第一阶段时,先更新raft server中集群信息,再更新raft中集群信息;
      • 当变更为新增节点时,加入节点到配置,建立与节点的连接;
      • 当变更为移除节点时,标记节点关闭;
    • 在第二阶段时,更新raft中集群信息,raft server中无需处理,新增的节点已在上阶段配置,删除的节点的移除在CnewC_{new}Cnew日志提交后;
func (s *RaftServer) applyChange(changes []*pb.MemberChange) {
	changeCount := len(changes)
	diffCount := 0
	for _, mc := range changes {
		p := s.peers[mc.Id]
		if mc.Type == pb.MemberChangeType_ADD_NODE {
			if (p == nil && s.id != mc.Id) || (p != nil && p.remote.address != mc.Address) {
				diffCount++
			}
		} else if mc.Type == pb.MemberChangeType_REMOVE_NODE {
			if p != nil || s.id == mc.Id {
				diffCount++
			}
		}
	}
	if diffCount == 0 && changeCount > 0 {
		return
	}
	if changeCount > 0 {
		for _, mc := range changes {
			if mc.Type == pb.MemberChangeType_ADD_NODE {
				if mc.Id != s.id {
					_, exsit := s.peers[mc.Id]
					if !exsit {
						peer := NewPeer(mc.Id, mc.Address, s.node, s.metric, s.logger)
						s.peers[mc.Id] = peer
					}
				}
			} else {
				if mc.Id != s.id {
					s.peers[mc.Id].close = true
				} else {
					s.close = true
				}
			}
		}
		// 先启动相关连接,再更新集群信息,防止无法发送消息
		s.node.ApplyChange(changes)
	} else {
		// 更新集群信息
		s.node.ApplyChange(changes)
	}
}

// 关闭连接
func (s *RaftServer) applyRemove() {
	for k, p := range s.peers {
		if p.close {
			delete(s.peers, k)
			p.Stop()
		}
	}

	if s.close {
		s.Stop()
	}
}

在raft server添加成员变更提案方法,将变更的成员、类型封装为成员变更消息进行提案

  • 变更前检查是否能够进行变更(前一次变更已完成),上次变更未完成则拒绝请求;
  • 按变更更新raft server、raft中的集群信息;
  • 向集群进行变更提案;
func (s *RaftServer) changeMember(peers map[string]string, changeType pb.MemberChangeType) error {

	changes := make([]*pb.MemberChange, 0, len(peers))
	for name, address := range peers {
		id := GenerateNodeId(name)
		change := &pb.MemberChange{
			Type:    changeType,
			Id:      id,
			Address: address,
		}
		changes = append(changes, change)
	}

	if !s.node.CanChange(changes) {
		return fmt.Errorf("前次变更未完成")
	}

	s.applyChange(changes)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	return s.node.ChangeMember(ctx, changes)
}

在raft node添加方法将变更包装为日志提案。

func (n *RaftNode) ChangeMember(ctx context.Context, changes []*pb.MemberChange) error {
	changeCol := &pb.MemberChangeCol{Changes: changes}
	data, err := proto.Marshal(changeCol)
	if err != nil {
		n.logger.Errorf("序列化变更成员信息失败: %v", err)
		return err
	}
	return n.Propose(ctx, []*pb.LogEntry{{Type: pb.EntryType_MEMBER_CHNAGE, Data: data}})
}

当leader追加日志时,检查是否为成员变更请求,当在进行成员变更时记录该日志编号,标记联合共识开始。

func (r *Raft) AppendEntry(entries []*pb.LogEntry) {
	lastLogIndex, _ := r.raftlog.GetLastLogIndexAndTerm()
	for i, entry := range entries {
		if entry.Type == pb.EntryType_MEMBER_CHNAGE {
			r.cluster.pendingChangeIndex = entry.Index
			r.cluster.inJoint = true
		}
		entry.Index = lastLogIndex + 1 + uint64(i)
		entry.Term = r.currentTerm
	}
	...
}

当follower收到Cold,newC_{old,new}Cold,new日志时,立即调用相关方法执行变更。

  • 过滤成员变更类型的日志,解码为成员变更信息,调用raft server应用变更方法执行变更。
func (s *RaftServer) process(msg *pb.RaftMessage) (err error) {
	defer func() {
		if reason := recover(); reason != nil {
			err = fmt.Errorf("处理消息 %s 失败:%v", msg.String(), reason)
		}
	}()
	if msg.MsgType == pb.MessageType_APPEND_ENTRY {
		for _, entry := range msg.GetEntry() {
			if entry.Type == pb.EntryType_MEMBER_CHNAGE {
				var changeCol pb.MemberChangeCol
				err := proto.Unmarshal(entry.Data, &changeCol)
				if err != nil {
					s.logger.Warnf("解析成员变更日志失败: %v", err)
				}
				s.applyChange(changeCol.Changes)
			}
		}
	}
	return s.node.Process(context.Background(), msg)
}

leader在处理日志响应时,检查联合共识是否已经完成进行第二阶段。

  • 在提交日志后,检查本次提交是否包含了成员变更日志(提交前编号小于成员变更编号,提交后编号大于成员变更编号);
  • 如提交含成员变更日志则该进入成员变更的第二阶段,发送一条空内容的变更日志;
func (r *Raft) ReciveAppendEntriesResult(from, term, lastLogIndex uint64, success bool) {
	leaderLastLogIndex, _ := r.raftlog.GetLastLogIndexAndTerm()
	if success {
		r.cluster.AppendEntryResp(from, lastLogIndex)
		if lastLogIndex > r.raftlog.commitIndex {
			// 取已同步索引更新到lastcommit
			if r.cluster.CheckCommit(lastLogIndex) {
				...
				// 检查联合共识是否完成
				if r.cluster.inJoint && prevApplied < r.cluster.pendingChangeIndex && lastLogIndex >= r.cluster.pendingChangeIndex {
					r.AppendEntry([]*pb.LogEntry{{Type: pb.EntryType_MEMBER_CHNAGE}})
					lastIndex, _ := r.raftlog.GetLastLogIndexAndTerm()
					r.cluster.pendingChangeIndex = lastIndex
				}
			}
		}
		...
	} 
	...
}

在日志提交后添加通道通知raft server变更日志提交

  • CnewC_{new}Cnew日志提交后,关闭连接,如当前节点已移除则关闭节点进程;
type RaftStorage struct {
	...
	notifyc             chan []*pb.MemberChange // 变更提交通知通道
}

func (s *RaftServer) handle() {
	go func() {
		for {
			select {
			case <-s.stopc:
				return
			case msgs := <-s.node.SendChan():
				s.sendMsg(msgs)
			case msg := <-s.incomingChan:
				s.process(msg)
			case changes := <-s.storage.NotifyChan():
				if len(changes) == 0 {
					go s.applyRemove()
				}
			}
		}
	}()
}

完整代码

参考:

转载自:https://juejin.cn/post/7239238662692569148
评论
请登录