【MIT 6.824】学习笔记 5: 2021 Raft 实现细节
▲ 点击上方"多颗糖"关注公众号
最近一直没更新,就是因为在搞 6.824 的 Raft 实验,2021 年的实验和以往有些不一样,上周终于把 Raft 所有的测试都通过了,记录一下实现过程中的心得。
关于 Raft 的细节不再赘述,还不清楚的读者欢迎点击查看:
之前我把代码放到 github 上,6.824 的老师发邮件给我让我转为私有仓库,说是会影响教学效果,所以这里不再公开源码,有任何实现上的困难可以加入学习群(群二维码在末尾),也请您不要公开您的实现,尊重自己和别人的劳动成果。
整个实验顺序大概是:
先实现选举
实现 log 复制,同时记得要加强选举
实现持久化,这是比较简单的,在需要持久化变量做更改的地方调用持久化函数即可
实现快照,同时要改掉所有日志索引的计算方式,包括选举那部分的
如果你想更有成就感,看到这里你就可以关了。
基本框架
2021 版的实验中,已经提前给出 ticker()
这个 goroutine,并且在实验手册中说明:不要使用 Go 的 time.Timer
或 time.Ticker
,这两个东西非常难以正确使用。
原文:Don't use Go's
time.Timer
ortime.Ticker
, which are difficult to use correctly.
所以,不能照着网上的很多答案,再去写 <-electionTimer.C
这样子的代码了。顺便说一下,我也很讨厌 <-Timer.C
这样的语法,这是个啥?
课程说用 time.Sleep()
来实现,我也喜欢这样的方式,如果后面想用 C++ 再实现一个 Raft(关注吧,我会在未来实现的),你不可能指望在 C++ 找到 <-Timer.C
这样的东西,但 time.Sleep()
是每一种语言都通用的。
所以,基本的框架大概这样(我删掉了一些判断,你千万不要直接拿去用,稍微改改,问题不大):
func (rf *Raft) ticker() {
for rf.killed() == false {
time.Sleep(electionTimeout)
// check heartbeats time
duration := time.Since(rf.getHeartbeatTime())
if duration > electionTimeout {
rf.election()
} else {
continue
}
}
}
最好封装一个角色转换的函数,无论是 becomeLeader(), becomeFollower()
还是 changeState(state int)
,很多情况下需要变更角色。
接着就照着论文中的 Figure 2 实现,但有些容易让人迷惑的地方,我直接在下文中写出来。
每个 server 都要实现
Follower 收到任期不小于自己的心跳 RPC,都要更新自己的心跳计时器;
如果
request.Term > currentTerm
:currentTerm = request.Term
,并且转为 Follower(没有这个条件,在两个节点选举的时候,会一直互相拿不到票)提交日志的时候,判断
commitIndex > lastApplied
;
RequestVote RPC
实现
if
args.Term < currentTerm
: returnfalse
if
votedFor is null or candidateId
: 收到此投票请求的服务器 V 将比较谁的日志更完整,所谓更完整,就是:(lastTermV > lastTermC) ||
将拒绝投票;(即:V 的任期比 C 的任期新,或任期相同但 V 的日志比 C 的日志更完整);
(lastTermV == lastTermC) && (lastIndexV > lastIndexC)
第 2 点在 lab 2A 中可以先不实现,在 lab 2B 必须实现。
AppendEntries RPC
实现
对于比较直观的内容这里就不说了,下面的内容其实都在 Figure 2 中,只不过需要细看,实现的时候容易忘掉或者忽略掉。
记得更新心跳计时器,如果一个操作耗时比较久,多更新两次也没关系。
Figure 2 提到,log 的 first index is 1,所以在初始化 log 的时候先插入一条空的记录,从 index 1 开始,否则在后续的实现过程中,会被边界条件搞到直接放弃。
日志不能直接
append
,遇到同一个index
但term
不一样的,以 Leader 为准,直接覆盖或丢弃 Follower 冲突的日志。怎么判断呢?从args.PrevLogIndex
开始判断就行了。论文中提到,
nextIndex[]
和matchIndex[]
除了Make
的时候要初始化,还要 Reinitialized after election,不要忘了!对于 lab 2B,如果
AppendEntries RPC
返回失败,nextIndex--
即可。
下面是比较关键的两个问题,一个日志复制完了,啥时候提交呢?
Leader 什么时候提交?
我们知道,多数派达成的时候提交,但论文写出的明显没有这么简单:
If there exists an
N
such thatN > commitIndex
, a majority ofmatchIndex[i] ≥ N
, andlog[N].term == currentTerm
: setcommitIndex = N
即,Leader 收到 AppendEntries RPC
的响应后,然后根据上面的算法找得到 N, 就可以提交了。在实验中,就是可以扔 msg 到 applyCh
了。
那为什么不是简单的日志复制到多数派就提交呢?请看《条分缕析 Raft 算法》中的“延迟提交,选出最佳 Leader”章节。
Follower 什么时候提交?
还是原文:
If
leaderCommit > commitIndex
, setcommitIndex =
min(leaderCommit, index of last new entry)
这里就可以提交了。道理很简单,Leader 都提交了,Follower 跟着提交就完事了。
到这里,lab 2B 应该也可以做完了。
快速处理日志冲突
前面说了,lab 2C 是比较简单的,在需要持久化的变量做更改的地方调用持久化函数即可。
但是,6.824 需要快速处理日志冲突,即,实验中如果每次都让 nextIndex--
太慢了。
我们先看论文原文:
If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs. For example, when rejecting an AppendEntries request, the follower can include the term of the conflicting entry and the first index it stores for that term. With this information, the leader can decrement nextIndex to bypass all of the conflicting entries in that term; one AppendEntries RPC will be required for each term with conflicting entries, rather than one RPC per entry. In practice, we doubt this optimization is necessary, since failures happen infrequently and it is unlikely that there will be many inconsistent entries.
论文说,如果因为日志冲突拒绝了 AppendEntries RPC
,可以在响应体中包含冲突的 index 和 term,根据这个信息,Leader 可以快速定位到冲突的位置,一次 RPC 就搞定。否则像之前那样递减 nextIndex
,一次只能判断一个 Entry 有没有问题,可能会好多次 RPC。
但作者也提到,在实践中,他怀疑这个优化是否是必须的,因为这样的大规模不一致其实是比较少见的。
不过既然实验要我们实现,我们就去实现。
简单来说,考虑三种情况(例子来自讲义:6.824 2020 Lecture 7: Raft (2)):
Case 1 | Case 2 | Case 3 | |
---|---|---|---|
S1 | 4 5 5 | 4 4 4 | 4 |
S2 | 4 6 6 6 | 4 6 6 6 | 4 6 6 6 |
S2 是任期 6 的 Leader,S1 刚重启加入集群,S2 发送给 S1 的 AppendEntries RPC
中 prevLogTerm=6
我们用三个变量来快速处理日志冲突:
XTerm:冲突 entry 的任期,如果存在的话
XIndex:XTerm 的第一条 entry 的 index
XLen:缺失的 log 长度,case 3 中 S1 的 XLen 为 1
上面三种情况对应:
Case 1:Leader 中没有 XTerm(即 5),
nextIndex = XIndex
Case 2:Leader 有 XTerm(即 4),
nextIndex = leader's last entry for XTerm
Case 3:Follower 的日志太久没追上,
nextIndex = XLen
(这里是按照讲义上写的,勘误:确实是nextIndex = XLen
)
快照
为啥需要快照?不能放任 log 一直膨胀,我们的存储空间有限,并且会影响重启时日志重放时间。
先确定哪些日志不能做快照?
没应用的
没提交的
宕机重启后执行顺序是怎样的?
从磁盘读取快照
从磁盘读取持久化的日志
将 Raft 的
lastApplied
设为lastIncludedIndex
,避免重新执行已经执行的日志(记得,这条也很重要!)
问题:如果 Leader 要将已经压缩的日志传给滞后的 Follower 怎么办?
此时可能
nextIndex
< log start index,不能再用AppendEntries RPC
来修复(这句话还已经暗示了,这个条件要在sendAppendEntries
里做判断)所以我们需要
InstallSnapshot RPC
来传递快照
实验提供了两个函数:
Snapshot(index int, snapshot []byte)
:生成index
之前包括index
的 log 的快照,Raft 截断这部分日志,只保存尾部的 log;CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool
:判断是否要安装快照,如果这里一直返回true
,你将获得满分。(这里我按照一直返回true
实现的,如果你实现困难,可以先在这里改改降低难度。)
You are free to implement Raft in a way that CondInstallSnapShot can always return true; if your implementation passes the tests, you receive full credit.
一些 Tips:
不用实现
offset
和done
,一次性发完所有快照,简单很多;Raft 不能再使用 log 的位置或长度来确定日志的 index,所以最好有个独立于日志位置的索引。(当然你非要每次都基于
includedIndex
计算也可以,不推荐,打日志的你怎么一眼看出index
是什么?);即使 log 被截断,
AppendEntries RPC
还是需要发送之前一条日志的index
和term
,所以我们是不是需要持久化lastIncludedTerm/lastIncludedIndex
呢?(独立思考)InstallSnapshot RPC
按照 Figure 13 实现就行了,记得做完快照以后,还是要在 log 索引 0 位置插一条空的日志,让 index 从 1 开始。
其它没什么好说的,最关键的是,要把之前直接获取日志索引的地方都改掉,所以你最好写几个获取日志的函数,例如 getLastIndex()
等等。
一个坑:Go 竞态条件下的 slice 处理
AppendEntries RPC
需要传 entries[]
参数,这里一开始我直接写成:entries: rf.log[nextIndex:]
但这样如果多跑几遍测试,就会发现有问题。简单来说,Go 中 slice 是引用类型,传递的是值的拷贝,修改看起来不同的 slice 但其实是指向同一个地址空间。需要用 copy()
做深拷贝。
详见:https://stackoverflow.com/questions/38923237/goroutines-sharing-slices-trying-to-understand-a-data-race
相关阅读
【MIT 6.824】学习笔记 1:MapReduce
【MIT 6.824】学习笔记 2:RPC and Threads
【MIT 6.824】学习笔记 3: GFS
【MIT 6.824】学习笔记4: 主从复制(Primary/Backup Replication)
加群学习
欢迎关注我的公众号: