The if here is crucial. If the follower has all the entries the
leader sent, the follower MUST NOT truncate its log. Any elements
following the entries sent by the leader MUST be kept. This is because
we could be receiving an outdated AppendEntries RPC from the leader, and
truncating the log would mean “taking back” entries that we may have
already told the leader that we have in our log.
在Students'
Guide to Raft中,写有提高nextIndex查找效率的方法 >If
a follower does not have prevLogIndex in its log, it should return with
conflictIndex = len(log) and conflictTerm = None.
找到冲突的index后,找到冲突term的第一个log
1 2 3 4 5 6 7 8 9 10 11 12
if args.PrevLogTerm != rf.log[args.PrevLogIndex].Term { // Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3) reply.ConflictTerm = rf.log[args.PrevLogIndex].Term for i, e := range rf.log { if e.Term == reply.ConflictTerm { reply.ConflictIndex = i } } DPrintf("%v conflict log %v\n", rf.me, lastLogIndex) rf.log = rf.log[:args.PrevLogIndex] return }
If a follower does have prevLogIndex in its log, but the term does
not match, it should return conflictTerm = log[prevLogIndex].Term, and
then search its log for the first index whose entry has term equal to
conflictTerm.
leader收到reply后,将nextindex设置为conflictterm的最后一个log之后的一项(有点绕),提高了效率
1 2 3 4 5 6 7 8 9 10 11 12
conflictIndex := reply.ConflictIndex _, lastLogIndex := rf.lastLogTermIndex() for i := 0; i < lastLogIndex; i++ { // 找到冲突term的最后一个log之后的index if rf.log[i].Term == reply.ConflictTerm { for i < lastLogIndex && rf.log[i].Term == reply.ConflictTerm { i++ } conflictIndex = i break } } rf.nextIndex[server] = conflictIndex
Upon receiving a conflict response, the leader should first search
its log for conflictTerm. If it finds an entry in its log with that
term, it should set nextIndex to be the one beyond the index of the last
entry in that term in its log. If it does not find an entry with that
term, it should set nextIndex = conflictIndex.
if args.PrevLogTerm != rf.log[args.PrevLogIndex].Term { // Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3) reply.ConflictTerm = rf.log[args.PrevLogIndex].Term for i, e := range rf.log { if e.Term == reply.ConflictTerm { reply.ConflictIndex = i } } DPrintf("%v conflict log %v\n", rf.me, lastLogIndex) rf.log = rf.log[:args.PrevLogIndex] return }
i := 0 // 找到第一个冲突的log,删除之后的内容,用新的覆盖 for ; i < len(args.Entries); i++ { cur := args.PrevLogIndex + 1 + i e := args.Entries[i] if cur <= lastLogIndex && rf.log[cur].Term != e.Term { rf.log = rf.log[:cur] break } if cur >= lastLogIndex { break } }
for ; i < len(args.Entries); i++ { e := args.Entries[i] rf.log = append(rf.log, e) }
func(rf *Raft)updateCommitIndex() { majority := len(rf.peers)/2 + 1 n := len(rf.log) for i := n - 1; i > rf.commitIndex; i-- { // 从最后的log向前寻找 replicated := 0 if rf.log[i].Term == rf.currentTerm { // 只提交当前Term的log for server := range rf.peers { if rf.matchIndex[server] >= i { replicated++ } } }
if replicated >= majority { DPrintf("commit %v\n", i) rf.commitIndex = i rf.apply() break } } }