0%

6.824 Lab2A: Raft Leader 选举

Raft

6.824 的 Lab2 是要实现一个 Raft 共识算法,Raft 主要包括 leader 选举、日志复制、安全性保证三部分,在 Lab2 的 A 和 B 中实现,在 C 中实现快照功能。

对 Go 语言不熟悉确实是一个难点…

实验目标

  • 实现 Leader 选举:在长时间没有收到心跳包后成为 Candidate 发送投票请求,成为 Leader
  • 心跳包:实现 Leader 与 Folllower 之间的心跳包

需要注意的

  1. Lab 的提示中建议使用 time.Sleep 来做一些周期性的事件,但是我发现 time.Timer 有时候更好用(因为不需要手动管理锁)
  2. test 限制了每秒不超过十次的心跳包,这里我选择了 150ms,选举超时时间为 400-800ms
  3. 记得要重置选举计时器

关键代码

Raft 结构体

首先是在 Raft 结构体中添加一些需要的字段,注意在 Lab2A 中不是所有的 figure2A 中提到的字段都需要用到,目前只需要添加需要用到的即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

electionTimer *time.Timer
stopCh chan bool

// Persisten
role role
currentTerm int
votedFor int
log []LogEntry
}

这里设置了 electionTimer 用来做选举超时的计时器,如果 electionTimer 超时则进入选举过程

选举

一个节点在超时后进入 Candidate 后需要将当前的 term+1,并且向所有的节点发起 RequestVote RPC。

其中有几点需要注意:

  1. 如果 RequestVote 的返回 Term 大于当前 Candidate 的 Term,则需要回到 Follower 角色
  2. 要注意 election 超时的情况,这里我复用了 electionTimer,既作为 Follower 的超时开始选举,也用作 election 超时
  3. 发送 RequestVote 时的 Term 与统计完票数后的 Term 可能不同,这一点在提示的 Rule5 中提到了
  4. 记得释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func (rf *Raft) election() {
rf.mu.Lock()
if rf.role == Leader {
rf.mu.Unlock()
return
}

rf.role = Candidate
rf.currentTerm++
rf.votedFor = rf.me
lastLogTerm, lastLogIndex := rf.lastLogTermIndex()

DPrintf("%v at %v start election\n", rf.me, rf.currentTerm)

args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogTerm: lastLogTerm,
LastLogIndex: lastLogIndex,
}
rf.mu.Unlock()

grantedCount := 1
voteCount := 1
votesCh := make(chan bool, len(rf.peers))
for idx := range rf.peers {
if idx == rf.me {
rf.resetElectionTimer()
continue
}
go func(ch chan bool, idx int) {
reply := RequestVoteReply{}
rf.sendRequestVote(idx, &args, &reply)
ch <- reply.VoteGranted
if reply.Term > args.Term {
rf.mu.Lock()
if reply.Term > rf.currentTerm {
rf.stepDown(reply.Term)
rf.mu.Unlock()
return
}
rf.mu.Unlock()
}
}(votesCh, idx)
}

for {
r := <-votesCh
voteCount++
if r == true {
grantedCount++
}
if voteCount == len(rf.peers) || grantedCount > len(rf.peers)/2 || voteCount-grantedCount > len(rf.peers)/2 {
break
}
}

if grantedCount <= len(rf.peers)/2 {
DPrintf("%v failed become leader\n", rf.me)
rf.mu.Lock()
rf.role = Folllower
rf.mu.Unlock()
return
}

rf.mu.Lock()
if args.Term == rf.currentTerm && rf.role == Candidate {
DPrintf("%v become leader\n", rf.me)
rf.role = Leader
go rf.tick()
}

rf.mu.Unlock()
}

发送 RequestVote RPC

这一段代码比较简单,使用了 Sleep 来等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ch := make(chan bool, 1)
r := RequestVoteReply{}

go func() {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
ch <- ok
}()

time.Sleep(RPCTimeout)
if <-ch {
return true
}

return false
}

RequestVote

收到投票请求后,需要判断下请求投票的节点的 Term 与当前节点的 Term,收到 RPC 后节点回到 Follower 角色

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("%v is requested to vote %v at term %v\n", rf.me, args.CandidateId, rf.currentTerm)

reply.Term = rf.currentTerm
reply.VoteGranted = false

if args.Term < rf.currentTerm {
return
} else if args.Term == rf.currentTerm {
if rf.role == Leader {
return
}
if rf.votedFor == args.CandidateId {
reply.VoteGranted = true
return
}
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
return
}
}

if args.Term > rf.currentTerm {
rf.stepDown(args.Term)
}

rf.currentTerm = args.Term
rf.role = Folllower
rf.votedFor = args.CandidateId
reply.VoteGranted = true
DPrintf("%v vote to %v\n", rf.me, args.CandidateId)
rf.resetElectionTimer()
return
}

心跳包

在 Lab2A 中,AppendEntries RPC 只用来作为心跳包。实现了 tick 函数,在节点成为 leader 后调用,定时的向其他节点发送心跳包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (rf *Raft) sendHeartbeat() {
for idx := range rf.peers {
if idx == rf.me {
rf.resetElectionTimer()
continue
}
go rf.sendAppendEntries(idx)
}
}

func (rf *Raft) tick() {
timer := time.NewTimer(HeartbeatTimeout)
for {
select {
case <-timer.C:
if _, isLeader := rf.GetState(); !isLeader {
DPrintf("%v stop tick\n", rf.me)
return
}
DPrintf("Leader %v tick\n", rf.me)
go rf.sendHeartbeat()
timer.Reset(HeartbeatTimeout)
case <-rf.stopCh:
return
}
}
}

运行结果

test2a

Test (2A): initial election …

这一阶段就是在一切正常的情况下能够选出来 leader 就可以了

test2a_1

Test (2A): election after network failure …

在第二个 test 中,需要解决网络断开的问题:

  1. 在 leader1 断开后,剩余的两个节点能够选取出 leader2
  2. 在 leader1 重新加入网络后,因为 leader1 的 term 已经落后,所以回到 Follower
  3. 断开两个节点后没有任何一个节点能够获得多数票
  4. 接入一个节点后,重新选举出 leader

test2a_2