TinyKv project2A流程
本文就是总结写project2a的时候的一些细节点(偏个人)
有参考学习:
在 project2A 中我们需要完善三个文件的代码,分别是 rawnode.go,raft.go,log.go
各部分的作用
log.go
在 Tinykv 中各个阶段的含义:
在 log.go 中定义了 RaftLog 的结构,我们看看他的结构:
// RaftLog manage the log entries, its struct look like:
//
// snapshot/first.....applied....committed....stabled.....last
// --------|------------------------------------------------|
type RaftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
// log entries with index <= stabled are persisted to storage.
// It is used to record the logs that are not persisted by storage yet.
// Everytime handling `Ready`, the unstabled logs will be included.
stabled uint64
// all entries that have not yet compact.
entries []pb.Entry
// the incoming unstable snapshot, if any.
// (Used in 2C)
pendingSnapshot *pb.Snapshot
// Your Data Here (2A).
}
代码中的实现:
storage:包含所有快照以来的稳定项
committed:对应论文中的 commitIndex,已知被提交的最高日志条目索引
applied:对应论文中的 lastApplied,应用到状态机的最高日志条目索引号
stable:节点已经持久化的最后一条日志的 index
entries:
snapshot/first.....applied....committed....stabled.....last
--------|------------------------------------------------|
log entries
pendingSnapshot:待处理快照
论文中的描述:
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
Snapshot() (pb.Snapshot, error)
}
我们可以看到在 Storage 中有很多方法,这里 Storage 维护了一个ents []pb.Entry
在 Storgae 里的 FirstIndex 和 LastIndex 都是基于 ents[0].index 得到的
这里 storage.InitialState 返回了保存的HardState和ConfState信息,这里 pb 是个包,pb.HardState包含需要持久化的节点的状态,包括当前期限、提交索引和投票记录,故我们从 HardState 中获取 committed
type HardState struct {
Term uint64 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"`
Vote uint64 `protobuf:"varint,2,opt,name=vote,proto3" json:"vote,omitempty"`
Commit uint64 `protobuf:"varint,3,opt,name=commit,proto3" json:"commit,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
总的一个数组就呈现这种:
snapshot/first.....applied....committed....stabled.....last
--------|------------------------------------------------|
log entries
从 fisrt 开始存储到 log entries 中,在 storgae 是被 stabled 的部分,只包含了 entries 的一部分,新的 log 不断加入 entries 的末尾
nextEnts
nextEnts 返回所有没有 apply 但是已经 committed
在 go 中,切片是不包含最后一位的
len=9 cap=9 slice=[0 1 2 3 4 5 6 7 8]
numbers == [0 1 2 3 4 5 6 7 8]
numbers[1:4] == [1 2 3]
所以我们在写的时候,直接用l.entries[l.applied-l.firstIndex+1,l.committed-l.firstIndex+1],注意要包含 l.committed
Term
获取对应 log 的 下标 Term,分两种情况即可
分别对应下标是否越界的情况:
type Entry struct {
EntryType EntryType `protobuf:"varint,1,opt,name=entry_type,json=entryType,proto3,enum=eraftpb.EntryType" json:"entry_type,omitempty"`
Term uint64 `protobuf:"varint,2,opt,name=term,proto3" json:"term,omitempty"`
Index uint64 `protobuf:"varint,3,opt,name=index,proto3" json:"index,omitempty"`
Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
下标没有越界的情况就直接去 pb.Entry.Term 获取
下标越界的情况交给 storage.Term 去处理,他有写越界时的逻辑
raft.go
借鉴
中 raft 的实现
Raft 驱动规则
- tick():Raft 内部的逻辑时钟,上层通过不断调用 Tick()来模拟时间的递增。这里我们只需要去实现
- Step():上层接口 rownode 通过调用 step 将 RPC 消息传递给 raft,raft 再通过 msg 的类型和自己的类型回调不同的函数
我们可以看到在 eraftpb.proto 中枚举了所有的 MessageType
Msg 类型
Msg 分为 Local Msg 和 Common Msg。前者是本地发起的 Msg,多为上层传递下来的,比如提交条目、请求心跳、请求选举等等,这些 Msg 不会在节点之间传播,它们的 term 也相应的等于 0。后者就是节点之间发送的 Msg,用来集群之间的同步。
TinyKV 通过 pb.Message 结构定义了所有的 Msg,即共用一个结构体。
const (
// 'MessageType_MsgHup' is a local message used for election. If an election timeout happened,
// the node should pass 'MessageType_MsgHup' to its Step method and start a new election.
MessageType_MsgHup MessageType = 0
// 'MessageType_MsgBeat' is a local message that signals the leader to send a heartbeat
// of the 'MessageType_MsgHeartbeat' type to its followers.
MessageType_MsgBeat MessageType = 1
// 'MessageType_MsgPropose' is a local message that proposes to append data to the leader's log entries.
MessageType_MsgPropose MessageType = 2
// 'MessageType_MsgAppend' contains log entries to replicate.
MessageType_MsgAppend MessageType = 3
// 'MessageType_MsgAppendResponse' is response to log replication request('MessageType_MsgAppend').
MessageType_MsgAppendResponse MessageType = 4
// 'MessageType_MsgRequestVote' requests votes for election.
MessageType_MsgRequestVote MessageType = 5
// 'MessageType_MsgRequestVoteResponse' contains responses from voting request.
MessageType_MsgRequestVoteResponse MessageType = 6
// 'MessageType_MsgSnapshot' requests to install a snapshot message.
MessageType_MsgSnapshot MessageType = 7
// 'MessageType_MsgHeartbeat' sends heartbeat from leader to its followers.
MessageType_MsgHeartbeat MessageType = 8
// 'MessageType_MsgHeartbeatResponse' is a response to 'MessageType_MsgHeartbeat'.
MessageType_MsgHeartbeatResponse MessageType = 9
// 'MessageType_MsgTransferLeader' requests the leader to transfer its leadership.
MessageType_MsgTransferLeader MessageType = 11
// 'MessageType_MsgTimeoutNow' send from the leader to the leadership transfer target, to let
// the transfer target timeout immediately and start a new election.
MessageType_MsgTimeoutNow MessageType = 12
)
MsgHup
'MessageType_MsgHup'是用于选举的本地消息。如果发生了选举超时,节点应将'MessageType_MsgHup'传递给它的Step方法,并开始新的选举。
MsgBeat
'MessageType_MsgBeat'是一个本地消息,它向Leader发送'MessageType_MsgHeartbeat'类型的心跳,给它的follower。
用于告知 Leader 该发送心跳了
当 leader 接收到 MsgBeat 时,向其他所有结点发送心跳,非 leader 接收到 MagBeat 时,忽略
MsgPropose
messagetype_msgproposal是一个本地消息,提议将数据附加到Leader的日志条目中。
用于上层请求 propose 条目
字段 | 描述 |
---|---|
MsgType | 类型 |
Entries | 要提交的条目 |
To | 目标节点 |
MsgAppend
包含了要复制的条目,是 leader 给其它节点同步条目
论文中的描述:
实现:
字段 | 描述 |
---|---|
MsgType | 类型 |
To | 目标节点 id |
From | leader 的 id |
Term | 当前节点的 term |
LogTerm | prevLogIndex 的任期 |
Index | 紧接新条目之前的日志条目索引(当前最大的日志条目索引) |
Entries | 要发送的条目 |
Commit | leader 的 commitIndex |
MagAppendResponse
// 'MessageType_MsgAppendResponse'是对日志复制请求的响应('MessageType_MsgAppend')。
用于告诉 Leader 日志同步是否成功
字段 | 描述 |
---|---|
MsgType | 类型 |
To | leader 的 id |
Term | 当前节点的 Term |
Index | r.RaftLog.LastIndex |
Reject | 是否拒绝 |
From | 当前接待你的 id |
MsgRequestVote
用于 candidate 请求选举投票。
字段 | 描述 |
---|---|
MsgType | 类型 |
To | 别的 candidate 们 |
Term | 当前节点的 Term |
Index | 当前最大的日志条目索引 |
LogTerm | prevLogIndex 的任期 |
From | 当前节点的 id |
MsgRequestVoteResponse
对于 MsgRequestVote 的回应
字段 | 值 |
---|---|
MsgType | 类型 |
To | 目标 id |
From | 当前节点 id |
Reject | 是否拒绝 |
Term | 当前节点 Term |
MsgSnapshot
leader 将快照发送给别的节点
MsgHeartbeat
leader 发送的心跳
MsgHeartbeatResponse
对 MsgHeartbeat 的回应
字段 | 描述 |
---|---|
MsgType | 类型 |
From | 当前节点的 id |
To | 目标节点的 id |
Term | 当前节点的 term |
MsgTransferLeader
用于上层请求转移 Leader
MsgTimeoutNow
节点收到后清空 r.electionElapsed,并即刻发起选举
Raft 结构中的字段
- heartbeatElapsed:心跳计时,只对 leader 有效,到达 heartbeatTimeout 后给所有节点发送维持心跳
- electionElapsed:选举计时,到达electionTimeout (选举超时)后开启选举。这里选举超时指的应该是 follower 在一定的时间段,即选举超时的这个时间端,没有收到任何 leader 的 heartbeat,那么他就会将自己转化为 candidate,开始新的选举
- leadTransferee:值不为 0 时代表的就是 leader 要转移给的那个对象的 id
- Prs:记录所有节点复制条目的情况,是个 map 类型,键为从 1 开始的数值,值为*Progress
type Progress struct {//Match:最后一个lastIndex,Next:Match+1
Match, Next uint64
}
- votes:哪些节点给我们投票了
newRaft 部分
给的参数是 Config 类型的,我们可以看到 Config 类型里有:
type Config struct {
ID uint64 //ID,不能为0
peers []uint64 //所有节点的id
//时间相关
ElectionTick int
HeartbeatTick int
// 持久化存储
Storage Storage
// 之前最后被应用的index
Applied uint64
}
我们直接通过 Config 中的数据来构建一个新的 raft 节点
函数实现
tick()
区分当前节点的不同的状态,分别进入不同的 Tick 函数
这里我们通过 r.State 区分,看结构中 state 是 StateType 类型的数值
我们找到 StateType 的结构:
// StateType represents the role of a node in a cluster.
type StateType uint64
const (
StateFollower StateType = iota
StateCandidate
StateLeader
)
分为三种 type,分别是 follower,candidate 和 leader
所以我们在实现 tick 时也要区分为这三种类型去做不同的逻辑
leader 的情况
当节点作为 leader 时,首先我们肯定要增加心跳计数heartbeatElapsed,并且在心跳超时时向集群发送心跳来维持自己的 leader 地位
这里比较有疑问的是对选举超时的处理,我们从论文中看到,Raft 使用的是随机选举超时,
在下面这种情况下,服务器通过和别的节点进行通信,通过监视来自其他服务器的消息,包括心跳消息、RequestVote RPC 请求等,来了解其他服务器的状态并进行相应的判断。在这个过程中,服务器会比较不同服务器的选举超时时间,并选取其中的最小值作为当前领导者的最小选举超时时间。这个时间将用于判断是否更新自己的任期或给予当前领导者投票。
但是我们考虑到 leader 掉线的情况,需要利用选举超时来判断是否掉线
主要通过心跳的回应数目是否达到一半,不达一半的情况可能就是 leader 掉线了,出现了集群分裂,需要进行重新选举(其实这里这块我还没想清楚)
看 etcd 的实现是通过一个MsgCheckQuorum消息实现,leader将给自己发送一条MsgCheckQuorum消息,对该消息的处理是:检查集群中所有节点的状态,如果超过半数的节点都不活跃了,那么leader也切换到follower状态。
但这里我没想清楚,只增加了心跳超时并做心跳超时的判断也过了()
case StateLeader:
r.heartbeatElapsed++
if r.heartbeatElapsed>=r.heartbeatTimeout{
r.heartbeatElapsed=0
err:=r.Step(pb.Message{MsgType:pb.MessageType_MsgBeat})
if err!=nil{
return
}
}
Step()
从 raft 的驱动规则中我们了解到,上层接口会调用 step 将 RPC 消息传递给 raft,由 step 分角色分消息类型地去执行对应的逻辑
我们在 step 中先分了角色,让不同角色在对应的 step 函数中去做处理
follower 的情况
- MsgHup
follow 收到 MsgHup 后,要变成 candidate,调用 becomeCandidate
如果整个 region 中只有自己这个节点,不用自己投票
- MsgHeartBeat
follower 收到 leader 发送的 heartBeat 后,首先先要判断 Msg 的 Term 是否大于自己的 term,只有大于的情况才更新
将选举计数清零,发送 MsgHeartbeatResponse 回应
- MsgAppend
leader 将要复制的条目交由 followers,当 follower 收到这个 msg 时,走handleAppendEntries 的逻辑
根据论文中的说法:
我们首先比较 m.Term 和当前节点的 term,如果当前节点的 term 更大则返回
如果 prevLogIndex > r.RaftLog.LastIndex(),说明当前节点可能漏了几条 leader 的日志,返回
如果接收者日志中没有包含这样一个条目:即该条目的任期在 prevLogIndex 上能和 prevLogTerm 匹配上,返回
然后我们考虑现有条目与新条目发生冲突的情况,寻找 msg 对应的 index 在当前节点对应的 term。在冲突的情况下删除当前及以后的所有条目,更新 raftLog 上的数据
candidate 的情况
- MsgHup
执行的步骤和 Follower 相同
- MsgRequestVote
当 candidate 接收到 MsgRequestVote 的 message 时,论文中的处理方式如下:
我们在实现上的逻辑就照着论文实现:
首先判断如果 Msg 的 term 小于当前节点的 term,直接拒绝
如果 votedFor 不为空或者不等于 candidateId,直接拒绝
最后判断 candidate 的日志是否和接收者的日志一样新:
- 如果两份日志最后的条目的任期号不同,则任期号大的日志更新
- 如果两份日志最后的条目的任期号相同,则日志长的更新
在满足上述条件后,candidate 就投票给消息的发送者,并且更新自己的字段
- MsgRequestVoteResponse
节点在收到 VoteResponse 后,根据 m.Reject 就可以判断对应的节点是否投票给自己
尔后节点根据 m.Reject 的值更新 r.votes,记录投票结果,如果同意的票数超半,那么当前 candidate 就变成 leader,否则就变成 follower
- MsgHeartBeat
接收到心跳消息后,如果消息的 Term 和当前节点的 Term 相同,则当前节点转化为 follower,接下来的处理方式和 follower 相同
- MsgAppendEntry
接收到复制条目的消息后,他的处理和 MsgHeartBeat 相似,先判断 Term,如果相同就转化为 follower,接下来的处理方式和 follower 相同
- MsgTransferLeader
在收到转移 Leader 的消息时,修改 m 的 To 并且更新自己的 msgs
leader 的情况
- MsgBeat
leader 接收到 MsgBeat 后向其他所有节点发送心跳
- MsgHeartBeatResponse
在收到 HeartBeatResponse 后,说明发送消息的节点是活跃的,则 leader 向其发送 Append 复制消息
leader 通过 r.Prs 来存储每个其它节点的复制情况,其 Next 就是当前节点期望的下一个条目的下标
判断 Next 和 r.RaftLog.lastIndex()
1.小于的情况:
Next==lastIndex+1,此时说明所有条目都复制完毕
别的情况则出错
2.大于的情况:
说明此时条目还没有复制完毕,我们将对应节点还没有复制的条目发送给他
根据不同的情况,要发送的 LogTerm 和 Index 也不同
- MsgRequestVote
和别的情况相同
- MsgPropose
leader 在接收到 Propose 后,先判断 r.leadTransferee 是否为 None,即集群是否在转移 Leader,如果没在转移 Leader,则将 m 提交的 entries 加到自己的 entries 中,然后他将向其他节点发送 MsgAppend,追加日志,如果集群只有他自己一个结点,直接更新自己的 committedIndex
- MsgAppendResponse
只有 Leader 会处理这个消息,如果消息是被 reject 的,那么重置 next,重置的规则为:将旧的 next-1,然后比较 m.Index+1,然后取小的那个复制给 next,重新发送给对应节点。
如果没有 reject,则更新对应节点的 match 和 next,将 next 赋值为 m.Index+1,match 赋值为 next-1
更新 commit,如果存在一个 N,N>commitIndex,大部分的 matchIndex[i]>=N,而且 log[N].term==currentTerm,则设置 commitIndex=N
在更新完后向其余节点发送更新 append 的消息
- MsgAppend
同上
become 部分
follower 的情况
一个节点变成 Follower,要先把自己的状态 State 变成 Follower(StateFollower),然后更改其 Lead 为传进来的 lead,同时 Term 也更改为传进来的 term,将自己的 Vote 投票对象初始为 None
Candidate 的情况
根据论文中说的步骤,先将自己的状态变为 Candidate(StateCandiadate),然后将自己的 Term 增加,将自己的 Lead 置为 None,给自己投票(r.Vote=r.id),初始化对应的 votes 来记录投票情况,同时在记录中更新自己的投票情况
Leader 的情况
首先先将当前节点的 State 转换为 StateLeader,同时更新自己的 Lead,看到代码中也提醒Leader should propose a noop entry on its term,也就是成为 Leader 要提交一个 noop 节点,no-op是和普通的heartbeat不一样,no-op是一个log entry,是一条需要落盘的log,只不过其只有term、index,没有额外的value信息。
所以我们要相应地更新 Prs 和 entries 中的内容,同时将这条新生成的条目发送给别的其它节点来复制
RawNode
RawNode 作为一个信息传递的模块,主要就是上层信息的下传和下层信息的上传。其核心为一个 Ready 结构体。
Ready 结构体
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MessageType_MsgSnapshot message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
}
RawNode 通过生成 Ready 的方式给上层传递信息
- SoftState:不需要被持久化的数据,只用在 HasReady()方法中的判断,如果没有更新就为 nil
- HardState:需要上层进行持久化存储,在发送消息之前,需要保存到稳定存储的节点的当前状态。如果没有更新,HardState将等于空状态。
- entries:在消息发送前需要被存储的 entries
- Snapshot:需要存储的 Snapshot
- CommitedEntries:需要应用的条目
- Message:需要发送的 Msg
根据需要,在 Raft 中分别将 SoftState 和 HardState 对应的字段传给 RawNode
HasReady
RawNode 通过 HasReady()来判断 Raft 模块是否已经有同步完成并且需要上层处理的信息
如果 HasReady()返回 true,那么上层就会调用 Ready()来获取具体要做的事情
在 hasReady 中,我们也只需要判断是否有上述 Ready 结构中出现的情况即可
Ready
Ready 要生成一个要返回给上层的 Ready 结构体,就按照上文的判断哪几种情况并进行相应的更新,最后返回包含更新信息的 Ready 结构体
Advance
Advance 是上层处理完了 Ready,用于通知 RawNode,以推进整个状态机
这里的更新包括提交了却未应用的部分和没有持久化存储的部分,我们需要相应地更新 Raft 中的这两部分
测试
2AA
漏洞百出.jpg
主要 debug 的方法就是去看测试的源码,看他测试的是哪种情况,然后相应地一步步打印信息,看哪步出现问题了,而且测试的内容看函数的名称也多多少少可以知道
第一次:
1.TestFollowerUpdateTermFromMessage2AA
在 Follower 处理handleAppendEntries 时,同时也要更新自己的 Term
2.TestCandidateUpdateTermFromMessage2AA
论文中写当 candidate 如果收到新leader的AppendEntries RPC:转为follower,这里新 leader 的判断,我本来写的是 m.Term==r.Term,但是应该改成>=更妥当,同理,Leader 也进行这样的更改,同时为了保险,直接在 Step 中就加上 if m.Term>r.Term{ r.becomeFollow(m.From,None) } 的逻辑,这里也是我论文看的不够仔细了
3.TestFollowerVote2AA
这里是 Follower 的情况没有处理 MsgRequestVote 的 Message,在 Follower 增加对MsgRequestVote 的处理
4.TestFollowerElectionTimeoutRandomized2AA
这里没有随机超时,因为 Raft 为了防止分裂投票,选举超时在一定范围获得,我们引入 rand 包,来实现随即超时
5.RequestVote
这里是节点在接收到 RequestVote 处理时,也需要对应地更新 term
2AB
1.TestLeaderStartReplication
这里是少了一条 leader 发送给别的节点的,这里把 i<len(r.Prs)改成 i<=len(r.Prs)
2.TestLeaderSyncFollowerLog2AB
我们在对应的消息处理处增加上相应的提示信息:
我们可以看到节点 1 成为 leader 之后向对应的节点都发送了 append 消息,但是还是出现 leader.RaftLog 和 follower.RaftLog 不一致的情况,这里应该是 handleAppendEntries 函数出现问题了
通过它打印出的 diff,我们可以看到就是 committed 不一样,根据打印出的信息,leader 的 committed 已经转化了,就是它同步 committed 的 append 消息没发出去,我们修改 sendAppend 函数,去掉 lastIndex 和 r.Prs[to].Next 之间比较的逻辑,他这个消息可以发出去
3.TestLeaderElectionOverwriteNewerLogs2AB
这里其实 node1 在第一次选举的时候是不会变成 leader 的,因为只有两个人投他(他自己和 node2),node3、4、5 都不会投他,我看了半天,我说我这里 4,5 咋会投 1 节点呢,按照设定,他们的 Vote 属性在初始化的时候就设置为 3 了,后来发现原来是我在初始化的时候把 becomeFollow 写在设置 Vote 后面了,becomeFollow 会把 r.Vote 清除为 None,把他俩顺序转一下就好了
通过了
转载自:https://juejin.cn/post/7355321162531831842