引言
MIT 6.824的系列实验是构建一个容错Key/Value存储系统,实验2是这个系列实验的第一个。在实验2(Lab 2)中我们将实现Raft这个基于复制状态机(replicated state machine)的共识协议。本文将详细讲解Lab 2B。Lab 2A在这里!
正文
Lab 2B的任务是实现日志复制(log replication),对应论文的5.3和5.4.1章节。我们的代码要能够选举出“合法”的leader,通过AppendEntries RPC复制日志,已提交(committed)的日志意味着复制到了多数派server,随后要将其正确地返回给上层应用执行。
数据结构
1 | go复制代码type Raft struct { |
在Raft的数据结构中,我们为Lab 2B新增了LogEntry(日志项)的结构体。根据论文的要求,每一个LogEntry要包含对应的命令,以及leader接收该命令时的term。
根据Figure 2,还要定义以下几个变量:
- commitIndex: 已知被提交的最高日志项对应的index。当日志项被提交(committed)了,意味着该日志项已经成功复制到了集群中的多数派server上,属于“集体记忆”了。如果当前的leader宕机再次发生选举,只有拥有完整已提交日志的server才能够获得多数派选票,才能被选举为leader。根据Leader完整性(Leader Completeness),如果一个日志项在某个term被提交了,则该Entry会存在于所有更高term的leader日志中。
- lastApplied: 应用(apply)给状态机的最高日志项的index,也就是上层应用“消费”到Raft日志项的最新index。
Leader使用nextIndex和matchIndex两个数组来维护集群中其它server的日志状态。在实现上有一点区别的是,论文中数组从1开始,我们在代码中从0开始(包括log)。 - nextIndex[]: 每个server分别对应着数组中的一个值。下一次要发给对应server的日志项的起始index。
- matchIndex[]: 每个server分别对应着数组中的一个值。已知成功复制到该server的最高日志项的index。
nextIndex可以被看作是乐观估计,值一开始被设置为日志的最高index,随着AppendEntry RPC返回不匹配而逐渐减小。matchIndex是保守估计,初始时认为没有日志项匹配(对应我们代码中的-1,论文中的0),必须AppendEntry RPC匹配上了才能更新值。这样做是为了数据安全:只有当某个日志项被成功复制到了多数派,leader才能更新commitIndex为日志项对应的index。
新Leader在选举成功后要重新初始化nextIndex和matchIndex这两个数组,然后通过AppendEntry RPC收集其它server的日志状态,具体细节我们在下面的日志复制(AppendEntries) 小节配合代码详细讲解。
除了论文中的这些状态,在Lab 2B的代码实现中,我们会单独使用一个goroutine(appMsgApplier),负责不断将已经被提交的日志项返回给上层应用,所以还需要额外添加以下几个变量用于goroutine同步:
- applyCh: 由实验提供,通过该channel将ApplyMsg发送给上层应用。
- moreApply: 示意有更多的日志项已经被提交,可以apply。
- applyCond: apply时用于多goroutine之间同步的Condition。
1 | go复制代码type RequestVoteArgs struct { |
这里RequestVote RPC的结构体相比于Lab 2A,新增了最后一个日志项的信息。LastLogIndex是 candidate最后一个日志项的index,而LastLogTerm是candidate最后一个日志项的term。这两个参数将用于下文中选举限制(election restriction)的判断。
1 | go复制代码type AppendEntryArgs struct { |
除了Term和LeaderId,在Lab 2B中AppendEntryArgs结构体新增了如下几个参数:
- Entries[]: 发送给对应server的新日志,如果是心跳则为空。这里要发送给对应server日志的index,是从nextIndex到最后一个日志项的index,注意也可能为空。
- PrevLogIndex: 紧跟在新日志之前的日志项的index,是leader认为follower当前可能已经同步到了的最高日志项的index。对于第i个server,就是nextIndex[i] - 1。
- PrevLogTerm: prevLogIndex对应日志项的term。
- LeaderCommit: leader已经提交的commit index。用于通知follower更新自己的commit index。
AppendEntryReply结构体新增了XTerm、XIndex和XLen几个变量用于nextIndex的快速回退(back up)。我们知道,论文中的nextIndex在AppendEntry RPC返回不匹配后,默认只是回退一个日志项(nextIndex[i]=PrevLogIndex)。如果follower能够返回更多信息,那么leader可以根据这些信息使对应server的nextIndex快速回退,减少AppendEntry RPC通信不匹配的次数,从而加快同步日志的步伐。这几个变量的具体含义: - XLen: 当前follower所拥有的的日志长度。
- XTerm: 当前follower的日志中,PrevLogIndex所对应日志项的term。可能为空。
- XIndex: 当前follower的日志中,拥有XTerm的日志项的最低index,可能为空。
主要函数
1 | go复制代码func Make(peers []*labrpc.ClientEnd, me int, |
Make函数是创建Raft server实例的入口,此处我们初始化Raft实例的各个变量。除了在goroutine中开始选主计时,我们还额外增加了一个appMsgApplier用于各个Raft实例apply已提交的日志给各自的上层应用。
1 | go复制代码// |
上层应用接收来自客户端的请求,通过Start函数对将要追加到Raft日志的command发起共识。注意读写要上锁,如果server不是leader则返回false。如果是leader的话,那么将command组装成LogEntry后追加到自己的日志中。此处要同时更新leader自己的matchIndex和nextIndex,目的是防止下面更新commitIndex时对多数派的判断出错。由于我们的日志数组index是从0开始,而论文是从1开始,因此我们返回的index要在原有基础上加一。
1 | go复制代码func (rf *Raft) convertToLeader() { |
convertToLeader函数,在原有Lab 2A的基础上,需要重新初始化nextIndex[]和matchIndex[],由调用者负责上锁。
选举限制(Election Restriction)
在Lab 2B中,我们需要为选主环节额外添加一些参数(lastLogIndex和lastLogTerm),确保满足「只有拥有完整已提交日志的server才能够被选举为leader」的选举限制。
1 | go复制代码func (rf *Raft) kickOffLeaderElection() { |
kickOffLeaderElection()中正式开始选主,我们在发送RequestVote RPC请求投票的实现中,额外增加了lastLogIndex和lastLogTerm。这里lastLogIndex是candidate最后一个日志项的index,如果日志为空那么为-1。lastLogTerm初始为-1,要在lastLogIndex大于等于零的情况下才能赋值,防止数组越界。计票部分和Lab 2A相同,不再赘述。
下面先看一下RequestVote请求投票这个RPC额外新增了哪些逻辑:
1 | go复制代码func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { |
根据论文中的选举限制,我们在投票时要额外判断:
- 是否没投票或者投给的是这个candidate。
- candidate的log是否至少和接受者的log一样新(up-to-date)。
当全部满足条件才能够投票。
Raft是通过比较两个server日志的最后一个日志项的index和term,来判别哪个更up-to-date的:
- 如果两个server的日志的最后一个日志项的term不同,那么拥有更晚term日志项的server的日志更up-to-date。
- 如果最后一个日志项的term相同,那么日志更长的更up-to-date。
在判别up-to-date的实现中,我们还要额外考虑当前的接受者日志为空的情况。
日志复制(AppendEntries)
在Lab 2A中,我们实现了server选举晋升为leader后,立即并周期性的通过AppendEntry发送心跳。而在Lab 2B中,我们同样要通过AppendEntry RPC进行日志复制,这也是本个实验的重点。
operateLeaderHeartbeat()中,我们还是在新的goroutine中发送AppendEntry RPC。在Lab 2B中新增了prevLogIndex、prevLogTerm、entries和leaderCommit几个参数:
- prevLogIndex,对于第i个server,就是rf.nextIndex[i] - 1,是紧跟在要发送给server[i]日志之前的日志项的index,用于接收者判别日志的同步情况。
- prevLogTerm是prevLogIndex对应日志项的term,为避免数组越界要判断prevLogIndex是否大于等于0。
- entries是发送给对应server的新日志,从rf.nextIndex[i]到最后一个日志项的index,注意也可能为空。
根据论文的日志匹配性质(Log Matching Property):
- 如果来自不同日志的两个日志项有相同的index和term,那么它们存储了相同的command。
- 如果来自不同日志的两个日志项有相同的index和term,那么它们前面的日志完全相同。
因此PrevLogIndex和PrevLogTerm与follower的日志部分匹配,就能确保follower的PrevLogIndex前的日志一致了。
1 | go复制代码func (rf *Raft) operateLeaderHeartbeat() { |
在通过sendAppendEntry()发送AppendEntry RPC并收到对应server响应后,首先判断返回的term看是否降级为follower。
接下来要很重要的一点,由于RPC在网络中可能乱序或者延迟,我们要确保当前RPC发送时的term、当前接收时的currentTerm以及RPC的reply.term三者一致,丢弃过去term的RPC,避免对当前currentTerm产生错误的影响。
当reply.Success为true,说明follower包含了匹配prevLogIndex和prevLogTerm的日志项,更新nextIndex[serverTo]和matchIndex[serverTo]。这里只能用prevLogIndex和entries来更新,而不能用nextIndex及len(log),因为后两者可能已经被别的RPC更新了,进而导致数据不一致。正确的更新方式应该是:rf.nextIndex[serverTo] = prevLogIndex + len(entries) + 1
,rf.matchIndex[serverTo] = prevLogIndex + len(entries)
。
由于matchIndex发生了变化,我们要检查是否更新commitIndex。根据论文,如果存在一个N,这个N大于commitIndex,多数派的matchIndex[i]都大于等于N,并且log[N].term等于currentTerm,那么更新commitIndex为N。这里必须注意,日志提交是有限制的,Raft从不提交过去term的日志项,即使已经复制达到了多数派。如果要更新commitIndex为N,那么N所对应的日志项的term必须是当前currentTerm。
论文的Figure 8仔细讲解了「leader只能提交term为curretTerm的日志项」的问题。在(c)中S1的currentTerm为4,不能提交即使已经复制到多数派的term为2的日志项,原因是可能会如(d)所示被term为3的日志项覆盖。但如(e)所示,如果term为4的日志项被复制到了多数派,那么此时S1可以将日志提交。因为S1作为leader,它的currentTerm是当前的最高term,当该currentTerm的日志项被复制到多数派后,根据up-to-date规则,不会再有较低term的server在选举获得多数派选票而成为leader,也就不再会有像(d)中覆盖的情况发生。
在检查是否更新commitIndex的实现上,我们将matchIndex复制到了matches数组中,通过sort升序排序。那么在majority := (len(rf.peers) - 1) / 2
时,大于一半的matchIndex大于等于matches[majority],因此rf.log[matches[majority]]恰好被复制到了多数派server。以majority为初始值自减遍历i,如果rf.log[matches[i]].Term == rf.currentTerm
,那么说明满足日志提交限制,找到了上述最大的“N”,随后调用sendApplyMsg(),通知有更多的日志项已经被提交,可以apply。循环的停止条件为i < 0 || matches[i] <= rf.commitIndex
,则说明没有找到更大的commitIndex。
当reply.Success为false,说明follower的日志不包含在prevLogIndex处并匹配prevLogTerm的日志项,要将nextIndex缩减。此处更新不宜采用自减的方式更新,因为RPC可能会重发,正确的方式是rf.nextIndex[serverTo] = prevLogIndex
。
我们在AppendEntryReply中增加了几个变量,以使nextIndex能够快速回退(back up)。如果接下来要尝试匹配的prevLogIndex比follower当前所拥有的的日志长度(XLen)还要大,那么显然直接从XLen尝试匹配即可。如果接下来要尝试匹配的prevLogIndex在XLen以内,因为我们已经知道了follower的日志从XIndex到当前prevLogIndex的日志项的term都是XTerm,那么我们可以直接在leader侧遍历匹配一遍,而无需多次往返RPC通信。
1 | go复制代码func (rf *Raft) AppendEntry(args *AppendEntryArgs, reply *AppendEntryReply) { |
在处理AppendEntry RPC的代码中,我们新增了日志匹配的逻辑。如果日志在prevLogIndex处不包含term为prevLogTerm的日志项,那么返回false。这里有两层意思,一个是接收者的日志没有index为prevLogIndex的日志项,另一个是有对应index的日志项但是term不匹配。同时,根据上面所说的快速回退机制,额外返回XLen、XTerm和XIndex。
此外还要注意prevLogIndex可能为-1,意味着日志全都没有匹配上,或者leader此刻还没有日志,此时接收者就要完全服从。
接下来是PreLogIndex与PrevLogTerm匹配到的情况,还要额外检查新同步过来的日志和已存在的日志是否存在冲突。如果一个已经存在的日志项和新的日志项冲突(相同index但是不同term),那么要删除这个冲突的日志项及其往后的日志,并将新的日志项追加到日志中。这里要注意的一个容易出错的地方是不先进行检查,将全部新日志直接追加到了已有日志上。 这样做一旦有旧的AppendEntry RPC到来,RPC的args.Entries的日志项是旧的,一旦直接把args.Entries追加到日志中,就会出现新数据丢失的不安全问题。
最后,根据论文,如果leaderCommit > commitIndex,说明follower的commitIndex也需要更新。为了防止越界,commitIndex取min(leaderCommit, index of last new entry)
。
日志Apply
我们单独使用一个goroutine(appMsgApplier),负责不断将已经被提交的日志项返回给上层应用。
Leader在将日志项复制到多数派后更新commitIndex的同时,要调用sendApplyMsg()。Follower在AppendEntry RPC收到LeaderCommit的更新时,也要调用sendApplyMsg()。
sendApplyMsg()改变rf.moreApply为true,示意有更多的日志项已经被提交,可以apply,并使用applyCond广播通知appMsgApplier。
1 | go复制代码func (rf *Raft) sendApplyMsg() { |
appMsgApplier在for循环中,如果没有需要apply的新日志项,则不断rf.applyCond.Wait()
等待通知。否则,由于应用消费日志项是一个耗时的过程,我们要快速释放锁,主要先将commitIndex拷贝,moreApply置为false,意味着目前的日志项apply工作已经接手,随后释放锁。
在接下来i从lastApplied + 1到commitIndex的循环中,我们组装好ApplyMsg,通过applyCh向上层应用提供日志项,在消费后上锁更新lastApplied。此时如果rf.commitIndex又有更新,sendApplyMsg()会被调用,moreApply又会变为true,所以appMsgApplier会在接下来的循环处理新的待apply的日志项。
总结
本文讲解了MIT 6.824 Lab 2B。按照实验要求讲解了选举限制、日志复制、快速回退和日志Apply,其中也有很多自己的感悟和思考,仅供参考。后续将在Lab 2C中继续讲解持久化。
🏆 技术专题第五期 | 聊聊分布式的那些事……
参考文献
- In Search of an Understandable Consensus Algorithm
- 2020 MIT 6.824 分布式系统
- 6.824 - Spring 2020
- Students’ Guide to Raft
- raft.github.io
- baidu/braft
- Mit6.824分布式系统学习(Lab2PartA)
本文转载自: 掘金