0%

6.824 Lab2B: Raft Log 复制

实验目标

  • 实现 Log Replication
  • 增加相应的选举限制

测试

这次实验有两个比较 tricky 的测试

  • TestRejoin2B: 两次的过期 leader 重新加入网络,都各自持有着未提交的 log,需要重新的同步

  • TestBackup2B: 对网络进行多次的部分断开重新连接,并且发送大量的 log(主要是测试集群的处理时间)

TestRejoin2B

遇到的情况是旧的 leader1 的 index=2 的位置是没有 commit 的 log,而应该从新的 leader 获得正确的 index=2 的 log 再进行 apply

经过检查原来是在 Append Entries 时,忘记了设置 lastterm 的比较

1
2
3
4
if args.PrevLogIndex <= lastLogIndex && args.PrevLogTerm != rf.log[args.PrevLogIndex].Term {
DPrintf("%v conflict log %v\n", rf.me, lastLogIndex)
reply.ConflictIndex = args.PrevLogIndex
}

TestBackup2B

然后在 2BBackup 挂了,最初以为是 Append Entries 处理太慢

最初的方案是便利 args.Entries 一个个比较修改,后来看到论文中

delete the existing entry and all that follow it (§5.3)
进行了一下修改,删除冲突 log 所有后续的记录以提高效率

1
rf.log = rf.log[:args.PrevLogIndex]

但是这样仍然不行,后来仔细检查 log,发现是节点一直在进行 election() 而没有选举出 leader,看代码发现是没有正确的实现选举计时部分,参加选举的节点会死锁在等待投票

修改之后可以通过 test2b 了,但是时间超过了要求的一分钟 (90s),估计是因为我使用了 time.Sleep 来等待 RPC,尝试更换成 timer。改了之后 75s 仍然超出了时限

The if here is crucial. If the follower has all the entries the leader sent, the follower MUST NOT truncate its log. Any elements following the entries sent by the leader MUST be kept. This is because we could be receiving an outdated AppendEntries RPC from the leader, and truncating the log would mean “taking back” entries that we may have already told the leader that we have in our log.

经过分析 log,发现一个问题:在 backup2b 中,重新连接后发送 append entries RPC 时,会有很多次的 conflict,造成很多冗余

Students’ Guide to Raft 中,写有提高 nextIndex 查找效率的方法

If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and conflictTerm = None.
找到冲突的 index 后,找到冲突 term 的第一个 log

1
2
3
4
5
6
7
8
9
10
11
12
if args.PrevLogTerm != rf.log[args.PrevLogIndex].Term {
// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
for i, e := range rf.log {
if e.Term == reply.ConflictTerm {
reply.ConflictIndex = i
}
}
DPrintf("%v conflict log %v\n", rf.me, lastLogIndex)
rf.log = rf.log[:args.PrevLogIndex]
return
}

If a follower does have prevLogIndex in its log, but the term does not match, it should return conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term equal to conflictTerm.
leader 收到 reply 后,将 nextindex 设置为 conflictterm 的最后一个 log 之后的一项 (有点绕),提高了效率

1
2
3
4
5
6
7
8
9
10
11
12
conflictIndex := reply.ConflictIndex
_, lastLogIndex := rf.lastLogTermIndex()
for i := 0; i < lastLogIndex; i++ { // 找到冲突term的最后一个log之后的index
if rf.log[i].Term == reply.ConflictTerm {
for i < lastLogIndex && rf.log[i].Term == reply.ConflictTerm {
i++
}
conflictIndex = i
break
}
}
rf.nextIndex[server] = conflictIndex

Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last entry in that term in its log.
If it does not find an entry with that term, it should set nextIndex = conflictIndex.

尝试增加 ConflicTerm 和 ConflicIndex,结果时间下降到 65s 左右

不搞了,就这样了:(

部分代码

Append Entries RPC

Handler

需要考虑到接收到的 log 的各种情况,我这里的 if 的各种情况写的不太” 正交”,等以后重构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

DPrintf("%v at %v response to %v\n", rf.me, rf.currentTerm, args.LeaderId)
if args.Term < rf.currentTerm {
reply.Term, reply.Success = rf.currentTerm, false
return
}
rf.role = Folllower
rf.currentTerm = args.Term
reply.Success = false
rf.resetElectionTimer()

_, lastLogIndex := rf.lastLogTermIndex()

if args.PrevLogIndex > lastLogIndex {
// 缺少log
DPrintf("%v miss log %v\n", rf.me, lastLogIndex)
reply.ConflictIndex = lastLogIndex + 1 // 要求leader下一次从第一个缺少的记录传输
reply.ConflictTerm = -1
return
}

if args.PrevLogTerm != rf.log[args.PrevLogIndex].Term {
// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
for i, e := range rf.log {
if e.Term == reply.ConflictTerm {
reply.ConflictIndex = i
}
}
DPrintf("%v conflict log %v\n", rf.me, lastLogIndex)
rf.log = rf.log[:args.PrevLogIndex]
return
}

i := 0
// 找到第一个冲突的log,删除之后的内容,用新的覆盖
for ; i < len(args.Entries); i++ {
cur := args.PrevLogIndex + 1 + i
e := args.Entries[i]
if cur <= lastLogIndex && rf.log[cur].Term != e.Term {
rf.log = rf.log[:cur]
break
}
if cur >= lastLogIndex {
break
}
}

for ; i < len(args.Entries); i++ {
e := args.Entries[i]
rf.log = append(rf.log, e)
}

if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, args.PrevLogIndex+len(args.Entries))
rf.apply()
}

reply.ConflictIndex = args.PrevLogIndex + len(args.Entries) + 1
reply.Success = true
if len(args.Entries) != 0 {
DPrintf("%v got entries up to %v", rf.me, reply.ConflictIndex-1)
}
}

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (rf *Raft) sendAppendEntries(server int) bool {
rf.mu.Lock()
prevLogIndex := rf.nextIndex[server] - 1
prevLogTerm := rf.log[prevLogIndex].Term
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: nil,
LeaderCommit: rf.commitIndex,
}
if len(rf.log) > prevLogIndex {
args.Entries = append([]LogEntry{}, rf.log[prevLogIndex+1:]...)
}
// DPrintf("%v append entries prevIndex %v len %v log len %v\n", server, args.PrevLogIndex, len(args.Entries), len(rf.log))
rf.mu.Unlock()
reply := AppendEntriesReply{}
ch := make(chan bool, 1)
timer := time.NewTimer(RPCTimeout)
go func() {
ok := rf.peers[server].Call("Raft.AppendEntries", &args, &reply)
ch <- ok
}()
select {
case r := <-ch:
if !r {
return false
}
rf.mu.Lock()
if !reply.Success {
if reply.Term > rf.currentTerm {
rf.stepDown(reply.Term)
rf.mu.Unlock()
return false
}
if reply.ConflictTerm == -1 {
// 缺少log
rf.nextIndex[server] = reply.ConflictIndex
rf.matchIndex[server] = rf.nextIndex[server] - 1
} else {
// log冲突
conflictIndex := reply.ConflictIndex
_, lastLogIndex := rf.lastLogTermIndex()
for i := 0; i < lastLogIndex; i++ { // 找到冲突term的最后一个log之后的index
if rf.log[i].Term == reply.ConflictTerm {
for i < lastLogIndex && rf.log[i].Term == reply.ConflictTerm {
i++
}
conflictIndex = i
break
}
}
rf.nextIndex[server] = conflictIndex
}

} else {
if rf.nextIndex[server] < reply.ConflictIndex {
// DPrintf("update %v nextindex %v\n", server, reply.NextIndex)
rf.nextIndex[server] = prevLogIndex + len(args.Entries) + 1
rf.matchIndex[server] = rf.nextIndex[server] - 1
rf.updateCommitIndex()
}
}
rf.mu.Unlock()
return true
case <-timer.C:
return false
}
}

Leader Commit

要在更新 matchIndex 的时候计算一下是否要更新 commitIndex,如果要更新 commitIndex 就要顺便 apply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (rf *Raft) updateCommitIndex() {
majority := len(rf.peers)/2 + 1
n := len(rf.log)
for i := n - 1; i > rf.commitIndex; i-- {
// 从最后的log向前寻找
replicated := 0
if rf.log[i].Term == rf.currentTerm {
// 只提交当前Term的log
for server := range rf.peers {
if rf.matchIndex[server] >= i {
replicated++
}
}
}

if replicated >= majority {
DPrintf("commit %v\n", i)
rf.commitIndex = i
rf.apply()
break
}
}
}

func (rf *Raft) apply() {
applied := rf.lastApplied
logs := append([]LogEntry{}, rf.log[applied+1:rf.commitIndex+1]...)
rf.lastApplied = rf.commitIndex

for idx, l := range logs {
msg := ApplyMsg{
Command: l.Command,
CommandIndex: applied + idx + 1,
CommandValid: true,
}
DPrintf("%v applied log %v\n", rf.me, applied+idx+1)
rf.applyCh <- msg
}
}

选举限制

记得要实现选举限制

1
2
3
4
if lastLogTerm > args.LastLogTerm || (lastLogTerm == args.LastLogTerm && lastLogIndex > args.LastLogIndex) {
DPrintf("%v not vote to %v\n", rf.me, args.CandidateId)
return
}