0%

6.824 Lab2B: Raft Log 复制

实验目标

  • 实现Log Replication
  • 增加相应的选举限制

测试

这次实验有两个比较tricky的测试

  • TestRejoin2B: 两次的过期leader重新加入网络,都各自持有着未提交的log,需要重新的同步

  • TestBackup2B: 对网络进行多次的部分断开重新连接,并且发送大量的log(主要是测试集群的处理时间)

TestRejoin2B

遇到的情况是旧的leader1的index=2的位置是没有commit的log,而应该从新的leader获得正确的index=2的log再进行apply

经过检查原来是在Append Entries时,忘记了设置lastterm的比较

1
2
3
4
if args.PrevLogIndex <= lastLogIndex && args.PrevLogTerm != rf.log[args.PrevLogIndex].Term {
DPrintf("%v conflict log %v\n", rf.me, lastLogIndex)
reply.ConflictIndex = args.PrevLogIndex
}

TestBackup2B

然后在2BBackup挂了,最初以为是Append Entries处理太慢

最初的方案是便利args.Entries一个个比较修改,后来看到论文中

delete the existing entry and all that follow it (§5.3)
进行了一下修改,删除冲突log所有后续的记录以提高效率

1
rf.log = rf.log[:args.PrevLogIndex]

但是这样仍然不行,后来仔细检查log,发现是节点一直在进行election()而没有选举出leader,看代码发现是没有正确的实现选举计时部分,参加选举的节点会死锁在等待投票

修改之后可以通过test2b了,但是时间超过了要求的一分钟(90s),估计是因为我使用了time.Sleep来等待RPC,尝试更换成timer。改了之后75s仍然超出了时限

The if here is crucial. If the follower has all the entries the leader sent, the follower MUST NOT truncate its log. Any elements following the entries sent by the leader MUST be kept. This is because we could be receiving an outdated AppendEntries RPC from the leader, and truncating the log would mean “taking back” entries that we may have already told the leader that we have in our log.

经过分析log,发现一个问题:在backup2b中,重新连接后发送append entries RPC时,会有很多次的conflict,造成很多冗余

Students’ Guide to Raft中,写有提高nextIndex查找效率的方法

If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and conflictTerm = None.
找到冲突的index后,找到冲突term的第一个log

1
2
3
4
5
6
7
8
9
10
11
12
if args.PrevLogTerm != rf.log[args.PrevLogIndex].Term {
// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
for i, e := range rf.log {
if e.Term == reply.ConflictTerm {
reply.ConflictIndex = i
}
}
DPrintf("%v conflict log %v\n", rf.me, lastLogIndex)
rf.log = rf.log[:args.PrevLogIndex]
return
}

If a follower does have prevLogIndex in its log, but the term does not match, it should return conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term equal to conflictTerm.
leader收到reply后,将nextindex设置为conflictterm的最后一个log之后的一项(有点绕),提高了效率

1
2
3
4
5
6
7
8
9
10
11
12
conflictIndex := reply.ConflictIndex
_, lastLogIndex := rf.lastLogTermIndex()
for i := 0; i < lastLogIndex; i++ { // 找到冲突term的最后一个log之后的index
if rf.log[i].Term == reply.ConflictTerm {
for i < lastLogIndex && rf.log[i].Term == reply.ConflictTerm {
i++
}
conflictIndex = i
break
}
}
rf.nextIndex[server] = conflictIndex

Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last entry in that term in its log.
If it does not find an entry with that term, it should set nextIndex = conflictIndex.

尝试增加ConflicTerm和ConflicIndex,结果时间下降到65s左右

不搞了,就这样了:(

部分代码

Append Entries RPC

Handler

需要考虑到接收到的log的各种情况,我这里的if的各种情况写的不太”正交”,等以后重构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

DPrintf("%v at %v response to %v\n", rf.me, rf.currentTerm, args.LeaderId)
if args.Term < rf.currentTerm {
reply.Term, reply.Success = rf.currentTerm, false
return
}
rf.role = Folllower
rf.currentTerm = args.Term
reply.Success = false
rf.resetElectionTimer()

_, lastLogIndex := rf.lastLogTermIndex()

if args.PrevLogIndex > lastLogIndex {
// 缺少log
DPrintf("%v miss log %v\n", rf.me, lastLogIndex)
reply.ConflictIndex = lastLogIndex + 1 // 要求leader下一次从第一个缺少的记录传输
reply.ConflictTerm = -1
return
}

if args.PrevLogTerm != rf.log[args.PrevLogIndex].Term {
// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
for i, e := range rf.log {
if e.Term == reply.ConflictTerm {
reply.ConflictIndex = i
}
}
DPrintf("%v conflict log %v\n", rf.me, lastLogIndex)
rf.log = rf.log[:args.PrevLogIndex]
return
}

i := 0
// 找到第一个冲突的log,删除之后的内容,用新的覆盖
for ; i < len(args.Entries); i++ {
cur := args.PrevLogIndex + 1 + i
e := args.Entries[i]
if cur <= lastLogIndex && rf.log[cur].Term != e.Term {
rf.log = rf.log[:cur]
break
}
if cur >= lastLogIndex {
break
}
}

for ; i < len(args.Entries); i++ {
e := args.Entries[i]
rf.log = append(rf.log, e)
}

if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, args.PrevLogIndex+len(args.Entries))
rf.apply()
}

reply.ConflictIndex = args.PrevLogIndex + len(args.Entries) + 1
reply.Success = true
if len(args.Entries) != 0 {
DPrintf("%v got entries up to %v", rf.me, reply.ConflictIndex-1)
}
}

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (rf *Raft) sendAppendEntries(server int) bool {
rf.mu.Lock()
prevLogIndex := rf.nextIndex[server] - 1
prevLogTerm := rf.log[prevLogIndex].Term
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: nil,
LeaderCommit: rf.commitIndex,
}
if len(rf.log) > prevLogIndex {
args.Entries = append([]LogEntry{}, rf.log[prevLogIndex+1:]...)
}
// DPrintf("%v append entries prevIndex %v len %v log len %v\n", server, args.PrevLogIndex, len(args.Entries), len(rf.log))
rf.mu.Unlock()
reply := AppendEntriesReply{}
ch := make(chan bool, 1)
timer := time.NewTimer(RPCTimeout)
go func() {
ok := rf.peers[server].Call("Raft.AppendEntries", &args, &reply)
ch <- ok
}()
select {
case r := <-ch:
if !r {
return false
}
rf.mu.Lock()
if !reply.Success {
if reply.Term > rf.currentTerm {
rf.stepDown(reply.Term)
rf.mu.Unlock()
return false
}
if reply.ConflictTerm == -1 {
// 缺少log
rf.nextIndex[server] = reply.ConflictIndex
rf.matchIndex[server] = rf.nextIndex[server] - 1
} else {
// log冲突
conflictIndex := reply.ConflictIndex
_, lastLogIndex := rf.lastLogTermIndex()
for i := 0; i < lastLogIndex; i++ { // 找到冲突term的最后一个log之后的index
if rf.log[i].Term == reply.ConflictTerm {
for i < lastLogIndex && rf.log[i].Term == reply.ConflictTerm {
i++
}
conflictIndex = i
break
}
}
rf.nextIndex[server] = conflictIndex
}

} else {
if rf.nextIndex[server] < reply.ConflictIndex {
// DPrintf("update %v nextindex %v\n", server, reply.NextIndex)
rf.nextIndex[server] = prevLogIndex + len(args.Entries) + 1
rf.matchIndex[server] = rf.nextIndex[server] - 1
rf.updateCommitIndex()
}
}
rf.mu.Unlock()
return true
case <-timer.C:
return false
}
}

Leader Commit

要在更新matchIndex的时候计算一下是否要更新commitIndex,如果要更新commitIndex就要顺便apply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (rf *Raft) updateCommitIndex() {
majority := len(rf.peers)/2 + 1
n := len(rf.log)
for i := n - 1; i > rf.commitIndex; i-- {
// 从最后的log向前寻找
replicated := 0
if rf.log[i].Term == rf.currentTerm {
// 只提交当前Term的log
for server := range rf.peers {
if rf.matchIndex[server] >= i {
replicated++
}
}
}

if replicated >= majority {
DPrintf("commit %v\n", i)
rf.commitIndex = i
rf.apply()
break
}
}
}

func (rf *Raft) apply() {
applied := rf.lastApplied
logs := append([]LogEntry{}, rf.log[applied+1:rf.commitIndex+1]...)
rf.lastApplied = rf.commitIndex

for idx, l := range logs {
msg := ApplyMsg{
Command: l.Command,
CommandIndex: applied + idx + 1,
CommandValid: true,
}
DPrintf("%v applied log %v\n", rf.me, applied+idx+1)
rf.applyCh <- msg
}
}

选举限制

记得要实现选举限制

1
2
3
4
if lastLogTerm > args.LastLogTerm || (lastLogTerm == args.LastLogTerm && lastLogIndex > args.LastLogIndex) {
DPrintf("%v not vote to %v\n", rf.me, args.CandidateId)
return
}