KVRaft架构
KVRaft的架构如图
Client通过RPC向KVServer发送Get/Append/Put的RPC,KVServer通过Start调用Raft记录Log,Raft之间通过AppendEntries等RPC实现Log一致。最终通过ApplyChannel将Log提供给KVServer对数据进行操作。
一致性
在本次实验中有一个一致性问题,具体可以看这里:一个操作可以在操作请求到返回的任一时刻完成,完成后要保证这个操作的一致性:即之后的Get不能返回Stale Data。
KVRaft实现
其实梳理了流程和架构之后实现KVRaft就不是很难了,需要注意的是记录一下每个Client最后执行的操作序号避免执行一个操作多次。
核心就是每个client保持有一个seq,发送请求时seq增加,重复发送时保持原来的seq。Server保留每个client最后一次发送的seq,如果小雨等于则不响应。
Client
结构体
1 2 3 4 5 6 7
| type Clerk struct { servers []*labrpc.ClientEnd leaderId int clientId int64 seq int64 }
|
clientId是初始化时生成的随机ID,seq是发送请求的序号,leaderId记录上一次的ID。
RPC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (ck *Clerk) PutAppend(key string, value string, op string) { DPrintf("client %v %v %v\n", op, key, value) seq := atomic.AddInt64(&ck.seq, 1) leaderId := ck.leaderId for { args := PutAppendArgs{ Seq: seq, Key: key, Value: value, Op: op, ClientId: ck.clientId, } reply := PutAppendReply{} ok := ck.servers[leaderId].Call("KVServer.PutAppend", &args, &reply)
if !ok { leaderId = (leaderId + 1) % len(ck.servers) continue } switch reply.Err { case OK: ck.leaderId = leaderId return case ErrWrongLeader: leaderId = (leaderId + 1) % len(ck.servers) } } return }
|
如果发送请求不成功则更换一个leaderId,如果请求成功则用成功的leaderId更新当前记录的状态。
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { op := Op{ MsgId: nrand(), Seq: args.Seq, Key: args.Key, Value: args.Value, Op: args.Op, ClientId: args.ClientId, } res := kv.waitCmdApply(op) reply.Err = res.Err return }
|
处理RPC的函数核心是等待集群返回的waitCmdApply
函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (kv *KVServer) waitCmdApply(op Op) (res NotifyMsg) { _, _, isLeader := kv.rf.Start(op) if !isLeader { res.Err = ErrWrongLeader return } DPrintf("wait cmd %v %v clientid %v seq %v\n", op.Op, op.Key, op.ClientId, op.Seq)
kv.mu.Lock() ch := make(chan NotifyMsg, 1) kv.notifyCh[op.MsgId] = ch kv.mu.Unlock()
t := time.NewTimer(WaitCmdTimeout) defer t.Stop() select { case res = <-ch: DPrintf("applied cmd %v %v clientid %v seq %v\n", op.Op, op.Key, op.ClientId, op.Seq) kv.mu.Lock() delete(kv.notifyCh, op.MsgId) kv.mu.Unlock() return case <-t.C: DPrintf("timeout wait cmd %v %v clientid %v seq %v\n", op.Op, op.Key, op.ClientId, op.Seq) res.Err = ErrApplyTimeout kv.mu.Lock() delete(kv.notifyCh, op.MsgId) kv.mu.Unlock() return } }
|
waitCmdApply
函数核心是等待Op成功的出现在applyCh,Op出现在applyCh后即代表log成功的复制到了整个集群中。其中使用了notifyCh
notifyCh
是从msgId(每个请求随机生成的ID)到chan的映射。如果对应的chan上出现了新的消息,则证明log成功复制。
为了更新notifyCh
,需要后台检测applyCh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| func (kv *KVServer) applyDeamon() { for { select { case <-kv.stopCh: return
case msg := <-kv.applyCh: op := msg.Command.(Op)
kv.mu.Lock() isRepeated := op.Seq <= kv.lastSeq[op.ClientId] if isRepeated { DPrintf("duplicated %v %v\n", op.Op, op.Key) } switch op.Op { case "Put": if !isRepeated { kv.data[op.Key] = op.Value kv.lastSeq[op.ClientId] = op.Seq DPrintf("lastSeq %v to %v\n", op.ClientId, kv.lastSeq[op.ClientId]) } case "Append": if !isRepeated { v, _ := kv.data[op.Key] kv.data[op.Key] = v + op.Value kv.lastSeq[op.ClientId] = op.Seq DPrintf("lastSeq %v to %v\n", op.ClientId, kv.lastSeq[op.ClientId]) } case "Get": }
if ch, ok := kv.notifyCh[op.MsgId]; ok { v, _ := kv.data[op.Key] ch <- NotifyMsg{ Err: OK, Value: v, } } kv.mu.Unlock() } } }
|