什么时候做快照
当persister.RaftStateSize()大于maxraftstate时需要打快照,快照由几部分组成:
- KVServer产生快照数据传送给Raft
- Raft记录快照信息
- 当Follower需要从故障中恢复或者AppendEntries需要的Log在LastSnapshotIndex之前时接受InstallSnapshot RPC
快照会改变什么
增加了快照后,会改变原来的直接访问Log下标的方式,需要在原来的Index上减去一个LastSnapshotIndex,所以这里需要几个辅助函数
1 2 3 4 5 6 7 8 9
| func (rf *Raft) getLogByIndex(logIndex int) LogEntry { idx := logIndex - rf.lastIncludedIndex return rf.log[idx] }
func (rf *Raft) getIndex(logIndex int) int { idx := logIndex - rf.lastIncludedIndex return idx }
|
然后需要在所有访问Log的地方将直接的下标访问改成通过这两个函数访问
KVRaft—->Raft
在KVRaft执行每一个Apply操作后,会检测日志文件的大小,如果超过预设值(maxrfstate),会向Raft发送创建Snapshot请求
1 2 3 4 5 6 7 8 9 10 11
| func (kv *KVServer) saveSnapshot(appliedId int) { if kv.maxraftstate == -1 { return } if kv.persister.RaftStateSize() > kv.maxraftstate*9/10 { DPrintf("triggered snapshot") data := kv.encodeSnapshot() kv.rf.SaveSnapshot(appliedId, data) } return }
|
在Raft接收到Snapshot请求后,需要使用Persister保存当前状态,删除Log
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (rf *Raft) SaveSnapshot(appliedId int, data []byte) { rf.mu.Lock() defer rf.mu.Unlock()
if appliedId <= rf.lastIncludedIndex { return }
lastLog := rf.getLogByIndex(appliedId) rf.log = rf.log[rf.getIndex(appliedId):] rf.lastIncludedIndex = appliedId rf.lastIncludedTerm = lastLog.Term persistState := rf.getPersistState() rf.persister.SaveStateAndSnapshot(persistState, data) }
|
InstallSnapshot
在节点重新接入Raft网络后,此时该节点的Log与当前Log相差很多,所以需要发送InstallSnapshot
在节点接收到InstallSnapshot后,使用Persister保存快照,讲快照请求发送到ApplyCh以通知上层KVRaft
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) { DPrintf("%d get snapshot to %v\n", rf.me, args.LastIncludedIndex) rf.mu.Lock() defer rf.mu.Unlock()
reply.Term = rf.currentTerm if args.Term < rf.currentTerm { return } else if args.Term > rf.currentTerm || rf.role != Folllower { rf.stepDown(args.Term) } rf.resetElectionTimer()
if rf.lastIncludedIndex >= args.LastIncludedIndex { return }
start := args.LastIncludedIndex - rf.lastIncludedIndex if start >= len(rf.log) { rf.log = make([]LogEntry, 1) rf.log[0].Term = args.LastIncludedTerm } else { rf.log = rf.log[start:] }
rf.lastIncludedIndex = args.LastIncludedIndex rf.lastIncludedTerm = args.LastIncludedTerm rf.persister.SaveStateAndSnapshot(rf.getPersistState(), args.Data) }
|
上层KVRaft接收到Snapshot请求后,使用Persister读取数据
1 2 3 4 5 6 7
| if !msg.CommandValid { kv.mu.Lock() kv.readPersist(kv.persister.ReadSnapshot()) kv.mu.Unlock() continue }
|
问题
现在的问题是节点之间通信正常下KVRaft执行操作的时间会特别长…
3B的第二个Test使用了80s才完成,还需要找找实现的问题