0%

6.824 Lab3A: KVRaft without log compaction

KVRaft架构

KVRaft的架构如图

KVRaft

Client通过RPC向KVServer发送Get/Append/Put的RPC,KVServer通过Start调用Raft记录Log,Raft之间通过AppendEntries等RPC实现Log一致。最终通过ApplyChannel将Log提供给KVServer对数据进行操作。

一致性

在本次实验中有一个一致性问题,具体可以看这里:一个操作可以在操作请求到返回的任一时刻完成,完成后要保证这个操作的一致性:即之后的Get不能返回Stale Data。

KVRaft实现

其实梳理了流程和架构之后实现KVRaft就不是很难了,需要注意的是记录一下每个Client最后执行的操作序号避免执行一个操作多次。

核心就是每个client保持有一个seq,发送请求时seq增加,重复发送时保持原来的seq。Server保留每个client最后一次发送的seq,如果小雨等于则不响应。

Client

结构体

1
2
3
4
5
6
7
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
leaderId int
clientId int64
seq int64
}

clientId是初始化时生成的随机ID,seq是发送请求的序号,leaderId记录上一次的ID。

RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
DPrintf("client %v %v %v\n", op, key, value)
seq := atomic.AddInt64(&ck.seq, 1)
leaderId := ck.leaderId
for {
args := PutAppendArgs{
Seq: seq,
Key: key,
Value: value,
Op: op,
ClientId: ck.clientId,
}
reply := PutAppendReply{}
ok := ck.servers[leaderId].Call("KVServer.PutAppend", &args, &reply)

if !ok {
leaderId = (leaderId + 1) % len(ck.servers)
continue
}
switch reply.Err {
case OK:
ck.leaderId = leaderId
return
case ErrWrongLeader:
leaderId = (leaderId + 1) % len(ck.servers)
}
}
return
}

如果发送请求不成功则更换一个leaderId,如果请求成功则用成功的leaderId更新当前记录的状态。

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
op := Op{
MsgId: nrand(),
Seq: args.Seq,
Key: args.Key,
Value: args.Value,
Op: args.Op,
ClientId: args.ClientId,
}
res := kv.waitCmdApply(op)
reply.Err = res.Err
return
}

处理RPC的函数核心是等待集群返回的waitCmdApply函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (kv *KVServer) waitCmdApply(op Op) (res NotifyMsg) {
_, _, isLeader := kv.rf.Start(op)
if !isLeader {
res.Err = ErrWrongLeader
return
}
DPrintf("wait cmd %v %v clientid %v seq %v\n", op.Op, op.Key, op.ClientId, op.Seq)

kv.mu.Lock()
ch := make(chan NotifyMsg, 1)
kv.notifyCh[op.MsgId] = ch
kv.mu.Unlock()

t := time.NewTimer(WaitCmdTimeout)
defer t.Stop()
select {
case res = <-ch:
DPrintf("applied cmd %v %v clientid %v seq %v\n", op.Op, op.Key, op.ClientId, op.Seq)
kv.mu.Lock()
delete(kv.notifyCh, op.MsgId)
kv.mu.Unlock()
return
case <-t.C:
DPrintf("timeout wait cmd %v %v clientid %v seq %v\n", op.Op, op.Key, op.ClientId, op.Seq)
res.Err = ErrApplyTimeout
kv.mu.Lock()
delete(kv.notifyCh, op.MsgId)
kv.mu.Unlock()
return
}
}

waitCmdApply函数核心是等待Op成功的出现在applyCh,Op出现在applyCh后即代表log成功的复制到了整个集群中。其中使用了notifyCh

notifyCh是从msgId(每个请求随机生成的ID)到chan的映射。如果对应的chan上出现了新的消息,则证明log成功复制。

为了更新notifyCh,需要后台检测applyCh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (kv *KVServer) applyDeamon() {
for {
select {
case <-kv.stopCh:
return

case msg := <-kv.applyCh:
// index := msg.CommandIndex
op := msg.Command.(Op)

kv.mu.Lock()
isRepeated := op.Seq <= kv.lastSeq[op.ClientId]
if isRepeated {
DPrintf("duplicated %v %v\n", op.Op, op.Key)
}
switch op.Op {
case "Put":
if !isRepeated {
kv.data[op.Key] = op.Value
kv.lastSeq[op.ClientId] = op.Seq
DPrintf("lastSeq %v to %v\n", op.ClientId, kv.lastSeq[op.ClientId])
}
case "Append":
if !isRepeated {
v, _ := kv.data[op.Key]
kv.data[op.Key] = v + op.Value
kv.lastSeq[op.ClientId] = op.Seq
DPrintf("lastSeq %v to %v\n", op.ClientId, kv.lastSeq[op.ClientId])
}
case "Get":
}

if ch, ok := kv.notifyCh[op.MsgId]; ok {
v, _ := kv.data[op.Key]
ch <- NotifyMsg{
Err: OK,
Value: v,
}
}
kv.mu.Unlock()
}
}
}