0%

6.824 Lab2A: Raft Leader选举

Raft

6.824的Lab2是要实现一个Raft共识算法,Raft主要包括leader选举、日志复制、安全性保证三部分,在Lab2的A和B中实现,在C中实现快照功能。

对Go语言不熟悉确实是一个难点…

实验目标

  • 实现Leader选举:在长时间没有收到心跳包后成为Candidate发送投票请求,成为Leader
  • 心跳包:实现Leader与Folllower之间的心跳包

需要注意的

  1. Lab的提示中建议使用time.Sleep来做一些周期性的事件,但是我发现time.Timer有时候更好用(因为不需要手动管理锁)
  2. test限制了每秒不超过十次的心跳包,这里我选择了150ms,选举超时时间为400-800ms
  3. 记得要重置选举计时器

关键代码

Raft结构体

首先是在Raft结构体中添加一些需要的字段,注意在Lab2A中不是所有的figure2A中提到的字段都需要用到,目前只需要添加需要用到的即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

electionTimer *time.Timer
stopCh chan bool

// Persisten
role role
currentTerm int
votedFor int
log []LogEntry
}

这里设置了electionTimer用来做选举超时的计时器,如果electionTimer超时则进入选举过程

选举

一个节点在超时后进入Candidate后需要将当前的term+1,并且向所有的节点发起RequestVote RPC。

其中有几点需要注意:

  1. 如果RequestVote的返回Term大于当前Candidate的Term,则需要回到Follower角色
  2. 要注意election超时的情况,这里我复用了electionTimer,既作为Follower的超时开始选举,也用作election超时
  3. 发送RequestVote时的Term与统计完票数后的Term可能不同,这一点在提示的Rule5中提到了
  4. 记得释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func (rf *Raft) election() {
rf.mu.Lock()
if rf.role == Leader {
rf.mu.Unlock()
return
}

rf.role = Candidate
rf.currentTerm++
rf.votedFor = rf.me
lastLogTerm, lastLogIndex := rf.lastLogTermIndex()

DPrintf("%v at %v start election\n", rf.me, rf.currentTerm)

args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogTerm: lastLogTerm,
LastLogIndex: lastLogIndex,
}
rf.mu.Unlock()

grantedCount := 1
voteCount := 1
votesCh := make(chan bool, len(rf.peers))
for idx := range rf.peers {
if idx == rf.me {
rf.resetElectionTimer()
continue
}
go func(ch chan bool, idx int) {
reply := RequestVoteReply{}
rf.sendRequestVote(idx, &args, &reply)
ch <- reply.VoteGranted
if reply.Term > args.Term {
rf.mu.Lock()
if reply.Term > rf.currentTerm {
rf.stepDown(reply.Term)
rf.mu.Unlock()
return
}
rf.mu.Unlock()
}
}(votesCh, idx)
}

for {
r := <-votesCh
voteCount++
if r == true {
grantedCount++
}
if voteCount == len(rf.peers) || grantedCount > len(rf.peers)/2 || voteCount-grantedCount > len(rf.peers)/2 {
break
}
}

if grantedCount <= len(rf.peers)/2 {
DPrintf("%v failed become leader\n", rf.me)
rf.mu.Lock()
rf.role = Folllower
rf.mu.Unlock()
return
}

rf.mu.Lock()
if args.Term == rf.currentTerm && rf.role == Candidate {
DPrintf("%v become leader\n", rf.me)
rf.role = Leader
go rf.tick()
}

rf.mu.Unlock()
}

发送RequestVote RPC

这一段代码比较简单,使用了Sleep来等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ch := make(chan bool, 1)
r := RequestVoteReply{}

go func() {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
ch <- ok
}()

time.Sleep(RPCTimeout)
if <-ch {
return true
}

return false
}

RequestVote

收到投票请求后,需要判断下请求投票的节点的Term与当前节点的Term,收到RPC后节点回到Follower角色

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("%v is requested to vote %v at term %v\n", rf.me, args.CandidateId, rf.currentTerm)

reply.Term = rf.currentTerm
reply.VoteGranted = false

if args.Term < rf.currentTerm {
return
} else if args.Term == rf.currentTerm {
if rf.role == Leader {
return
}
if rf.votedFor == args.CandidateId {
reply.VoteGranted = true
return
}
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
return
}
}

if args.Term > rf.currentTerm {
rf.stepDown(args.Term)
}

rf.currentTerm = args.Term
rf.role = Folllower
rf.votedFor = args.CandidateId
reply.VoteGranted = true
DPrintf("%v vote to %v\n", rf.me, args.CandidateId)
rf.resetElectionTimer()
return
}

心跳包

在Lab2A中,AppendEntries RPC只用来作为心跳包。实现了tick函数,在节点成为leader后调用,定时的向其他节点发送心跳包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (rf *Raft) sendHeartbeat() {
for idx := range rf.peers {
if idx == rf.me {
rf.resetElectionTimer()
continue
}
go rf.sendAppendEntries(idx)
}
}

func (rf *Raft) tick() {
timer := time.NewTimer(HeartbeatTimeout)
for {
select {
case <-timer.C:
if _, isLeader := rf.GetState(); !isLeader {
DPrintf("%v stop tick\n", rf.me)
return
}
DPrintf("Leader %v tick\n", rf.me)
go rf.sendHeartbeat()
timer.Reset(HeartbeatTimeout)
case <-rf.stopCh:
return
}
}
}

运行结果

test2a

Test (2A): initial election …

这一阶段就是在一切正常的情况下能够选出来leader就可以了

test2a_1

Test (2A): election after network failure …

在第二个test中,需要解决网络断开的问题:

  1. 在leader1断开后,剩余的两个节点能够选取出leader2
  2. 在leader1重新加入网络后,因为leader1的term已经落后,所以回到Follower
  3. 断开两个节点后没有任何一个节点能够获得多数票
  4. 接入一个节点后,重新选举出leader

test2a_2