The if here is crucial. If the follower has all the entries the leader sent, the follower MUST NOT truncate its log. Any elements following the entries sent by the leader MUST be kept. This is because we could be receiving an outdated AppendEntries RPC from the leader, and truncating the log would mean “taking back” entries that we may have already told the leader that we have in our log.
If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and conflictTerm = None. 找到冲突的index后,找到冲突term的第一个log
1 2 3 4 5 6 7 8 9 10 11 12
if args.PrevLogTerm != rf.log[args.PrevLogIndex].Term { // Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3) reply.ConflictTerm = rf.log[args.PrevLogIndex].Term for i, e := range rf.log { if e.Term == reply.ConflictTerm { reply.ConflictIndex = i } } DPrintf("%v conflict log %v\n", rf.me, lastLogIndex) rf.log = rf.log[:args.PrevLogIndex] return }
If a follower does have prevLogIndex in its log, but the term does not match, it should return conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term equal to conflictTerm. leader收到reply后,将nextindex设置为conflictterm的最后一个log之后的一项(有点绕),提高了效率
1 2 3 4 5 6 7 8 9 10 11 12
conflictIndex := reply.ConflictIndex _, lastLogIndex := rf.lastLogTermIndex() for i := 0; i < lastLogIndex; i++ { // 找到冲突term的最后一个log之后的index if rf.log[i].Term == reply.ConflictTerm { for i < lastLogIndex && rf.log[i].Term == reply.ConflictTerm { i++ } conflictIndex = i break } } rf.nextIndex[server] = conflictIndex
Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last entry in that term in its log. If it does not find an entry with that term, it should set nextIndex = conflictIndex.
if args.PrevLogTerm != rf.log[args.PrevLogIndex].Term { // Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3) reply.ConflictTerm = rf.log[args.PrevLogIndex].Term for i, e := range rf.log { if e.Term == reply.ConflictTerm { reply.ConflictIndex = i } } DPrintf("%v conflict log %v\n", rf.me, lastLogIndex) rf.log = rf.log[:args.PrevLogIndex] return }
i := 0 // 找到第一个冲突的log,删除之后的内容,用新的覆盖 for ; i < len(args.Entries); i++ { cur := args.PrevLogIndex + 1 + i e := args.Entries[i] if cur <= lastLogIndex && rf.log[cur].Term != e.Term { rf.log = rf.log[:cur] break } if cur >= lastLogIndex { break } }
for ; i < len(args.Entries); i++ { e := args.Entries[i] rf.log = append(rf.log, e) }
func(rf *Raft) updateCommitIndex() { majority := len(rf.peers)/2 + 1 n := len(rf.log) for i := n - 1; i > rf.commitIndex; i-- { // 从最后的log向前寻找 replicated := 0 if rf.log[i].Term == rf.currentTerm { // 只提交当前Term的log for server := range rf.peers { if rf.matchIndex[server] >= i { replicated++ } } }
if replicated >= majority { DPrintf("commit %v\n", i) rf.commitIndex = i rf.apply() break } } }