0%

6.824 Lab3B: KVRaft with log compaction

什么时候做快照

当persister.RaftStateSize()大于maxraftstate时需要打快照,快照由几部分组成:

  1. KVServer产生快照数据传送给Raft
  2. Raft记录快照信息
  3. 当Follower需要从故障中恢复或者AppendEntries需要的Log在LastSnapshotIndex之前时接受InstallSnapshot RPC

快照会改变什么

增加了快照后,会改变原来的直接访问Log下标的方式,需要在原来的Index上减去一个LastSnapshotIndex,所以这里需要几个辅助函数

1
2
3
4
5
6
7
8
9
func (rf *Raft) getLogByIndex(logIndex int) LogEntry {
idx := logIndex - rf.lastIncludedIndex
return rf.log[idx]
}

func (rf *Raft) getIndex(logIndex int) int {
idx := logIndex - rf.lastIncludedIndex
return idx
}

然后需要在所有访问Log的地方将直接的下标访问改成通过这两个函数访问

KVRaft—->Raft

在KVRaft执行每一个Apply操作后,会检测日志文件的大小,如果超过预设值(maxrfstate),会向Raft发送创建Snapshot请求

1
2
3
4
5
6
7
8
9
10
11
func (kv *KVServer) saveSnapshot(appliedId int) {
if kv.maxraftstate == -1 {
return
}
if kv.persister.RaftStateSize() > kv.maxraftstate*9/10 {
DPrintf("triggered snapshot")
data := kv.encodeSnapshot()
kv.rf.SaveSnapshot(appliedId, data)
}
return
}

在Raft接收到Snapshot请求后,需要使用Persister保存当前状态,删除Log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (rf *Raft) SaveSnapshot(appliedId int, data []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()

if appliedId <= rf.lastIncludedIndex {
return
}

lastLog := rf.getLogByIndex(appliedId)
rf.log = rf.log[rf.getIndex(appliedId):]
rf.lastIncludedIndex = appliedId
rf.lastIncludedTerm = lastLog.Term
persistState := rf.getPersistState()
rf.persister.SaveStateAndSnapshot(persistState, data)
}

InstallSnapshot

在节点重新接入Raft网络后,此时该节点的Log与当前Log相差很多,所以需要发送InstallSnapshot

在节点接收到InstallSnapshot后,使用Persister保存快照,讲快照请求发送到ApplyCh以通知上层KVRaft

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
DPrintf("%d get snapshot to %v\n", rf.me, args.LastIncludedIndex)
rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
return
} else if args.Term > rf.currentTerm || rf.role != Folllower {
rf.stepDown(args.Term)
}
rf.resetElectionTimer()

if rf.lastIncludedIndex >= args.LastIncludedIndex {
return
}

start := args.LastIncludedIndex - rf.lastIncludedIndex
if start >= len(rf.log) {
rf.log = make([]LogEntry, 1)
rf.log[0].Term = args.LastIncludedTerm
} else {
rf.log = rf.log[start:]
}

rf.lastIncludedIndex = args.LastIncludedIndex
rf.lastIncludedTerm = args.LastIncludedTerm
rf.persister.SaveStateAndSnapshot(rf.getPersistState(), args.Data)
}

上层KVRaft接收到Snapshot请求后,使用Persister读取数据

1
2
3
4
5
6
7
if !msg.CommandValid {
//是一个Snapshot
kv.mu.Lock()
kv.readPersist(kv.persister.ReadSnapshot())
kv.mu.Unlock()
continue
}

问题

现在的问题是节点之间通信正常下KVRaft执行操作的时间会特别长…

3B的第二个Test使用了80s才完成,还需要找找实现的问题

result