【MIT6.5840】Lab5-Sharded Key/Value Service

2025-03-16T22:48:11+08:00 | 14分钟阅读 | 更新于 2025-03-16T22:48:11+08:00

bestzy
【MIT6.5840】Lab5-Sharded Key/Value Service

本文将介绍MIT6.5840的Lab5的核心功能实现,完整代码见Github。

https://github.com/bestzy6/MIT6.5840

整个Lab5中,各部分的关系如下图所示。Shardctrler-Cluster存储了配置情况,ClientServer从中获取配置并完成一系列逻辑操作。

Lab5A

ShardCtrler代码实现

ShardCtrler几乎可以无脑复制Lab4的代码,Client与Server的主要流程是一致的,同样需要注意去重,RPC请求需要增加ClerkIDReqID,但无需考虑快照。此文只讲解核心实现,具体可参考代码。

Op结构体如下:

type Op struct {
    // Your data here.
    Type    string
    Servers map[int][]string //Join
    GIDs    []int            //Leave
    Shard   int              //Move
    GID     int              //Move
    Num     int              //Query
    ClerkId int64
    ReqId   int64
}

相较于Lab4,handleOperation方法中的数据库操作变更为opExecutor,代码如下:

func (sc *ShardCtrler) opExecutor(op Op) (Config, Err) {
    var (
       cfg Config
       err Err
    )
    switch op.Type {
    case Join:
       cfg, err = sc.joinExecutor(op)
       sc.configs = append(sc.configs, cfg)
       return cfg, err
    case Leave:
       cfg, err = sc.leaveExecutor(op)
       sc.configs = append(sc.configs, cfg)
       return cfg, err
    case Move:
       cfg, err = sc.moveExecutor(op)
       sc.configs = append(sc.configs, cfg)
       return cfg, err
    case Query:
       cfg, err = sc.queryExecutor(op)
       return cfg, err
    }
    return Config{}, OK
}

根据任务要求,在进行Join、Leave操作后需要负载均衡处理。以下解释4个操作逻辑。

Join&Leave

Join RPC 由管理员用于添加新的副本组。其参数是一组从唯一且非零的副本组标识符(GIDs)到服务器名称列表的映射。shardctrler 应通过创建包含新副本组的新配置来响应。新配置应尽可能均匀地在所有组之间分配分片,并且应尽可能少地移动分片以实现该目标。如果 GID 不是当前配置的一部分,shardctrler 应允许重新使用 GID(即,应允许 GID 先加入,然后离开,再重新加入)。

Leave RPC 的参数是之前加入的组的 GID 列表。shardctrler 应创建一个不包含这些组的新配置,并将这些组的分片分配给剩余的组。新配置应尽可能均匀地在组之间分配分片,并应尽可能少地移动分片以实现该目标。

选取最后的配置并加入或移除Group,操作后的负载均衡后续进行详解。

需要注意的是,新配置的map必须深拷贝,否则当前的变动会影响到之前的配置。以下给出Join的代码,Leave与Join相似

func (sc *ShardCtrler) joinExecutor(op Op) (Config, Err) {
    var (
       servers = op.Servers //GID -> 服务名称
       cfgNum  = len(sc.configs)
       lastCfg = sc.configs[cfgNum-1]
    )
    // 深拷贝,创建新的Groups
    DPrintf("[Server%d] %v Join, Origin Shards:%v", sc.me, servers, lastCfg.Shards)
    newGroups := make(map[int][]string, len(lastCfg.Groups))
    for gid, srvs := range lastCfg.Groups {
       var (
          newServices = make([]string, 0, len(srvs))         // 服务名称
          ServicesMap = make(map[string]struct{}, len(srvs)) // 服务名称Set,用于去重
       )
       for _, srv := range srvs {
          newServices = append(newServices, srv)
          ServicesMap[srv] = struct{}{}
       }
       if tmpServices, ok := servers[gid]; ok {
          for _, tmpService := range tmpServices {
             // 判断是否重复
             if _, exist := ServicesMap[tmpService]; exist {
                continue
             }
             newServices = append(newServices, tmpService)
          }
          // 删除GID,表示已经添加
          delete(servers, gid)
       }
       newGroups[gid] = newServices
    }
    // 如果servers中的GID在原组中不存在,则servers长度不为0
    for gid, srvs := range servers {
       newGroups[gid] = srvs
    }
    DPrintf("[Server%d] %v Join newGroups: %v", sc.me, servers, newGroups)

    shards := NewBalancer(sc.me).ReBalanceShards(lastCfg.Shards, newGroups)
    DPrintf("[Server%d] %v Join balanced Shards: %v", sc.me, servers, shards)
    return Config{
       Num:    cfgNum,
       Shards: shards,
       Groups: newGroups,
    }, OK
}

Move

Move RPC 的参数是一个分片编号和一个 GID。shardctrler 应创建一个新配置,将分片分配给该组。 Move 的目的是让我们能够测试您的软件。在 Move 之后的 Join 或 Leave 可能会撤销 Move ,因为 Join 和 Leave 会重新平衡。

并没有需要特别注意的点,与Join&Leave一样深拷贝Groups字段即可。

func (sc *ShardCtrler) moveExecutor(op Op) (Config, Err) {
    var (
       shard  = op.Shard
       gid    = op.GID
       cfgNum = len(sc.configs)
       cfg    = sc.configs[cfgNum-1]
    )

    // 深拷贝
    newGroups := make(map[int][]string, len(cfg.Groups))
    for id, srvs := range cfg.Groups {
       newSrvs := make([]string, len(srvs))
       copy(newSrvs, srvs)
       newGroups[id] = newSrvs
    }

    var newShards [NShards]int
    for i := range cfg.Shards {
       if i == shard {
          newShards[i] = gid
          continue
       }
       newShards[i] = cfg.Shards[i]
    }

    newConfig := Config{
       Num:    cfgNum,
       Shards: newShards,
       Groups: newGroups,
    }
    return newConfig, OK
}

Query

Query RPC 的参数是一个配置编号。shardctrler 会回复具有该编号的配置。如果编号为 -1 或大于已知的最大配置编号,shardctrler 应回复最新的配置。 Query(-1) 的结果应反映 shardctrler 在接收 Query(-1) RPC 之前完成处理的每个 Join 、 Leave 或 Move RPC。

用于查询指定编号的配置。根据任务的要求,如果序号传入-1或者大于配置号,则返回最新的配置。

func (sc *ShardCtrler) queryExecutor(op Op) (Config, Err) {
    cfg := sc.configs[len(sc.configs)-1]
    num := op.Num
    if num < 0 || num >= len(sc.configs) {
       return cfg, OK
    }
    return sc.configs[num], OK
}

ShardCtrler负载均衡

Lab5A中的难点是实现负载均衡。假设存在4个Group,以及10个Shard,将Group按照Shard数量倒序排序,此时负载状态如下图所示:

显然Group的平均Shard数为2(10/4=2)。如果我们将Group中大于Avg的Shard截取,再将截取的Shard分配给Shard数量小于等于Avg的Group中,那么以上负载状态将会多出2个Shard,如下图所示:

我们的目标是使Shard尽可能不移动,因此最佳的负载状态是S3和S4保持原样,而S7和S8移动至G3、G4。此时负载状态为:

要使Group达到如上负载均衡,一种朴素的方法是求出Shard平均值,以及Shard取模,基于模长,动态调整Group的最大Shard值,保证各个Group所分配的最大与最小Shard数量差值为1。具体做法如下:

  1. 求出Avg,上述情况为 10 / 4 = 2
  2. 求出Mod,上述情况为 10 % 4 = 2
  3. 将Group序列中前Mod个Group的保留长度设置为Avg + 1,其余Group的保留长度为Avg

Balancer

为保证单一原则,我们将负载均衡逻辑抽离。Balancer结构体(类)是负载均衡处理器。

type Balancer struct {
    me int
}

func NewBalancer(me int) *Balancer {
    return &Balancer{
       me: me,
    }
}

Balancer实现了ReBalanceShards方法,该方法要求传入最后一个配置的Shard数组oldShards,以及最新的Groups。逻辑步骤如下:

  1. 创建Group-Shard链表结构grpShardMap,表示Group负责的Shard序号,其中Key为GID,而Value为Shard切片,注意切片按照Shard序号顺序排序
  2. 将Group按照Shard数量倒序排序,Shard数量一致时按GID顺序排序,以此保证排序结果的稳定性
  3. 筛选出需要休眠的Group,即排序序号大于等于NShards数量的Group。因为这些Group无法分到Shard。
  4. 将“超出Group容量的Shard”、“休眠Group的所有Shard”放置在freeShards中,freeShards属于自定义Set类型,具体参考代码。
  5. freeShards排序,按序列放到未超容量的Group中。
  6. 根据grpShardMap重新生成Shards数组。
func (b *Balancer) ReBalanceShards(oldShards [NShards]int, groups map[int][]string) [NShards]int {
    if len(groups) == 0 {
       return [NShards]int{}
    }
    // 初始情况下,不存在GID的Shard应该归属于0
    // oldShards可能被分配至已经不存在的Group
    grpShardMap, freeShards := b.groupShardMap(oldShards, groups)
    sortedGid := b.sortGroupByShard(grpShardMap)
    sleepGid := b.getSleepGid(sortedGid)
    DPrintf("[Server%d] ReBalanceShards sortedGid:%v ; sleepGid:%v ; freeShards: %v", b.me, sortedGid, sleepGid, freeShards)

    // 计算Shard平均数量,平均值最小为1
    avgShardNum := NShards / (len(sortedGid) - sleepGid.Len())
    modShardNum := NShards % (len(sortedGid) - sleepGid.Len())

    // 将超过平均值的Shard放置在freeShards中
    for i, gid := range sortedGid {
       shards := grpShardMap[gid]
       // 如果GID在sleepGid中,则将Shard放置在freeShards中
       if sleepGid.Contain(gid) {
          freeShards.Add(shards...)
          continue
       }

       // 计算当前GID应该拥有的Shard数量
       var pos int
       if i < modShardNum {
          pos = avgShardNum + 1
       } else {
          pos = avgShardNum
       }

       // 如果Shard数量大于pos值,则将多余的Shard放置在freeShards中
       if len(shards) > pos {
          freeShards.Add(shards[pos:]...)
          grpShardMap[gid] = shards[:pos]
       }
    }
    // 保证每次分配Shard时取出的Shard是有序的,以此确保副本的一致性
    unAsgnShards := freeShards.SortedSlice()

    // 重新分配Shards
    for i, gid := range sortedGid {
       if sleepGid.Contain(gid) {
          continue
       }
       shards := grpShardMap[gid]

       var pos int
       if i < modShardNum {
          pos = avgShardNum + 1
       } else {
          pos = avgShardNum
       }

       // 如果Shard数量小于平均值,则从unAsgnShards中取出Shard进行分配
       for len(shards) < pos {
          if len(unAsgnShards) == 0 {
             break
          }
          shards = append(shards, unAsgnShards[0])
          unAsgnShards = unAsgnShards[1:]
       }
       grpShardMap[gid] = shards

    }
    DPrintf("[Server%d] ReBalanceShards grpShardMap: %v", b.me, grpShardMap)
    shards := b.genNShards(grpShardMap)
    return shards
}

Server核心代码实现

代码整体上与Lab4相同,具体的可参考代码仓库

更新配置

Server的Op结构体如下,我们需要添加Config字段,以便将配置传递给Follower。相同的,我们需要增加*OpTypeConfig*类型来表示当前Op结构体是配置类型

type Op struct {
    // Your definitions here.
    // Field names must start with capital letters,
    // otherwise RPC will break.
    Type    string
    Key     string // OpTypeGet, OpTypePut, OpTypeAppend
    Value   string // OpTypeGet, OpTypePut, OpTypeAppend
    ClerkID int64
    ReqId   int64
    Config  shardctrler.Config // OpTypeConfig
}

handleOperation方法中,也要增加配置类型的处理逻辑,如下:

switch op.Type {
case OpTypeGet:
    val, err = kv.db.Get(op.Key)
case OpTypePut:
    val, err = kv.db.Put(op.Key, op.Value)
case OpTypeAppend:
    val, err = kv.db.Append(op.Key, op.Value)
case OpTypeConfig:
    err = kv.UpdateConfig(op.Config)
}

updateConfig方法的实现如下:

func (kv *ShardKV) UpdateConfig(newCfg shardctrler.Config) Err {
    if kv.config.Num >= newCfg.Num {
       DPrintf("[Server%d-%d] Config Num[%d] >= New Config Num[%d], Do not need to Update", kv.gid, kv.me, kv.config.Num, newCfg.Num)
       return OK
    }
    if newCfg.Num-kv.config.Num != 1 {
       return ErrWrongConfigNum
    }
    if newCfg.Num == 1 {
       DPrintf("[Server%d-%d] Config is NIL, Apply Config, Update :[%v]", kv.gid, kv.me, newCfg)
       kv.config = newCfg
       return OK
    }
    kv.config = newCfg
    DPrintf("[Server%d-%d] Update Config Success:[%v]", kv.gid, kv.me, newCfg)
    return OK
}

监听配置

对于无需每次都查询ShardCtrler,在Lab5B中,任务期望我们以100ms间隔轮询。因此在实现上,我们开启一个协程,每隔100ms获取一次配置,如需要更新配置,则创建Op结构体并发送至Raft。

特别需要注意锁的获取和释放。

func (kv *ShardKV) PollConfig() {
    for !kv.killed() {
       time.Sleep(100 * time.Millisecond)
       if kv.isLeader() {
          update := false
          kv.mu.Lock()
          nextCfg := kv.config.Num + 1
          kv.mu.Unlock()
          cfg := kv.mck.Query(nextCfg)
          kv.mu.Lock()
          if cfg.Num > kv.config.Num {
             update = true
          }
          kv.mu.Unlock()

          if update {
             _, err := kv.opHandler(Op{
                Type:    OpTypeConfig,
                ClerkID: kv.ClerkId,
                ReqId:   kv.nextId(),
                Config:  cfg,
             })
             if err != OK {
                DPrintf("[Server%d-%d] Push Config Op Failed, Err: %v", kv.gid, kv.me, err)
             }
          }
       }
    }
}

易错点

总结一下易错点,如下:

  1. 排序算法的稳定性
    1. 要保证各副本Shard数组的一致性,使用排序时必须保证结果的稳定性。
  2. map迭代顺序的无序性
    1. map本身是无序的,若不考虑这点,可能会导致副本Shard数组不一致。
  3. Group数量与NShard的关系
    1. 若Group数量大于Nshard,那么必然会存在部分Group无法分配到Shard。
  4. 更新Config需要由Leader通过Raft同步至各Follower
    1. 此举才能保证线性一致性。
  5. 必须按配置编号顺序地更新
    1. 不然会出现配置不一致的bug,所以每次向ShardCtrler拉取的都是当前cfg编号+1的配置。

Lab5B

这一部分的主要任务是在控制器更改分片时,在副本组之间移动分片,并以提供线性化 k/v 客户端操作的方式进行。Lab5B 附有两个 challenges,分别要求我们实现

  1. 失效数据的清除(Challenge 1)
  2. 异步变更配置的 shardkv(Challenge 2)。

我们的目标是实现两个Challenge,因此在设计之初就需要考虑。

配置更新策略

每个Group的配置必须按照严格的版本号顺序处理(C1→C2→C3),这确保了系统状态的一致性和可预测性。

而对于Group内部的更新策略,我最初的考虑是这样的:当需要更新的配置与当前配置的差异不与之前的差异冲突时,就允许更新,即“非冲突diff可接受”的原则。这种做法的优点包括:

  1. 提高了并发更新的可能性;
  2. 对于互不影响的配置项变更可以独立进行

但缺点也很明显,实现该策略的复杂性无比巨大,需要考虑多个Group之间的配置同步,而且可能引入难以调试的竞态条件。

于是我转换了策略,即Group必须完全处理完当前配置变更后,才进入下一配置。这种设计也反映了分布式系统设计中的重要原则:有时候更简单的方法虽然看起来效率较低,但实际上能够显著降低出错风险。

Challenge要求已经迁移的分片能立即投入服务,无需等待所有分片迁移完毕。为了满足Challenge的要求,我们需要设置一个Shard状态数组来标明状态,包括ServingSendingReceiving

type ShardStatus uint8

const (
    ShardStatusServing   ShardStatus = iota // 正在提供服务
    ShardStatusSending               = iota // 正在发送
    ShardStatusReceiving             = iota // 正在接收
)

分片迁移策略

假设根据新的配置,某Shard需要从Server1迁移至Server2,那么我们应该如何进行迁移?

网上大多数文章都采用Pull模式,即让Server2主动向Server1发起请求,以获取分片数据。但此方法不能很好的让Server1判断Shard是否迁移完毕,因为Server2主动请求,Server1成功返回数据后,Server1无法确定Server2是否收到数据,因此不能更改自身的状态。那么,Server1就需要主动发送RPC请求至Server2,来判断Server2是否获取数据。

因此,我采用的策略是Push方法,使用Push方法会比较方便的同步Server的Shard状态,发送一次请求即可修改Server1、Server2的Shard状态,时序图如下。

至于两种方法的难易程度,我个人认为两者的难易程度相当。

时序图的场景是正常情况,但我们不应该假设网络都是正常的,因此上述2、4步骤很有可能发生错误。

  1. 步骤2发生错误时,Server1进行重传
  2. 步骤4发生错误时,Server1进行重传,但Server2需要额外的判断来保证不会出错。
    1. 当Server1 RPC的ConfigNum 小于 Server2的ConfigNum时,Server返回OK,因为默认数据迁移完毕后才能更新ConfigNum;
    2. 当Server1 RPC的ConfigNum 等于 Server2的ConfigNum时,检查Shard状态,如果为Serving,则返回OK。
    3. 当Server1 RPC的ConfigNum 大于 Server2的ConfigNum时,返回Err。

迁移分片后的去重处理

我在实现时,遇到了移动分片后无法去重的问题,如下图所示:

  1. Client发起Append "AB"请求,此时请求由G1处理;
  2. G1处理完成,但超时。Client将会重新发起请求;
  3. Client还未发起请求,发生分片迁移,“AB"迁移至G2处理;
  4. Client发起Append请求,此时由G2处理,G2重复处理后返回成功信息给Client。

对Client而言,只成功提交了一次,但是对于Server而言成功Append了两次,使得结果出错。

处理方法

在迁移数据时,将已经应用的日志信息AppliedLog也进行迁移,新的Group收到AppliedLog后合并至自身AppliedLog。

分片迁移数据丢失问题

发生分片迁移之后,似乎原先的数据丢失了?分片迁移时似乎没有把最新的数据迁移过来。

img

问题:G1收到了新的Config,正在发生迁移Shard1至G2,此时Shard1的状态为“sending”,当Shard1由G1发送至G2之后,G1将Shard1的状态改为Serving,当修改Shard1的请求到来时,检查到Shard1的状态为Serving,则修改Shard1了,导致错误!

引起问题的原因如下:

  1. G1几乎同时更新ConfigAppend Shard1消息,ConfigAppend Shard1都进入Raft等待提交;
  2. G1收到更新Config命令,将Shard1迁移至G2,此时Shard1的状态修改为Sending;
  3. G1将Shard1迁移完毕,Shard1状态修改为Serving;
  4. G1收到Raft提交的Append Shard1命令,执行成功后返回至客户端,产生线性不一致问题

在步骤4中,G1收到Raft提交的Append Shard1命令后,不应该执行命令,应该返回错误。

处理方法

handleOperation中,除了处理重复值外,还需要判断Shard的分配状态。

核心代码实现

分片迁移

分片迁移的引入会改变handleOperation方法的逻辑,可以参考仓库代码

迁移的RPC的请求响应结构体如下,在请求中,除去必要的Shard、ConfigNum和KVs以外,ClerkId和ReqId用于去重,Applied用于迁移后的去重。

type MigrateShardArgs struct {
    Shard     int
    ConfigNum int
    KVs       []KV
    Applied   AppliedLog

    ClerkId int64
    ReqId   int64
}

type MigrateShardReply struct {
    ConfigNum int
    Err       Err
}

以下是MigrateShardRPC方法实现。

func (kv *ShardKV) MigrateShard(args *MigrateShardArgs, reply *MigrateShardReply) {
    if !kv.isLeader() {
       DPrintf("[Server%d-%d] MigrateShard Shard:[%d] not Leader", kv.gid, kv.me, args.Shard)
       reply.Err = ErrWrongLeader
       return
    }

    kv.mu.Lock()
    reply.ConfigNum = kv.config.Num

    if kv.config.Num < args.ConfigNum {
       DPrintf("[Server%d-%d] MigrateShard Shard:[%d] Current ConfigNum:[%d] < ConfigNum:[%d]",
          kv.gid, kv.me, args.Shard, kv.config.Num, args.ConfigNum)
       reply.Err = ErrWrongConfigNum
       kv.mu.Unlock()
       return
    }
    if kv.config.Num > args.ConfigNum {
       DPrintf("[Server%d-%d] MigrateShard Shard:[%d] Current ConfigNum:[%d] > ConfigNum:[%d]",
          kv.gid, kv.me, args.Shard, kv.config.Num, args.ConfigNum)
       reply.Err = OK
       kv.mu.Unlock()
       return
    }
    if kv.ShardsStatus[args.Shard] == ShardStatusServing {
       DPrintf("[Server%d-%d] MigrateShard Shard:[%d] Already Serving", kv.gid, kv.me, args.Shard)
       reply.Err = OK
       kv.mu.Unlock()
       return
    }
    kv.mu.Unlock()

    op := Op{
       Type:       OpTypeAddShard,
       ClerkID:    args.ClerkId,
       ReqId:      args.ReqId,
       Shard:      args.Shard,
       ShardData:  args.KVs,
       ConfigNum:  args.ConfigNum,
       AppliedLog: args.Applied,
    }
    _, err := kv.opHandler(op)
    if err == OK {
       DPrintf("[Server%d-%d] MigrateShard Shard:[%d] Success", kv.gid, kv.me, args.Shard)
    }
    reply.Err = err
}

SendShard有感知到自身Shard发生变化的Leader主动调用,将Shard数据发送至新的Server。只有当RPC响应成功后,才会将数据库中的Shard数据删除。

func (kv *ShardKV) SendShard(shard, configNum int, applied AppliedLog, reqId int64) {
    if !kv.isLeader() {
       DPrintf("[Server%d-%d] SendShard %d not Leader", kv.gid, kv.me, shard)
       return
    }

    var reply MigrateShardReply
    args := MigrateShardArgs{
       Shard:     shard,
       ConfigNum: configNum,
       Applied:   applied,
       KVs:       nil,
       ClerkId:   kv.ClerkId,
       ReqId:     reqId,
    }

    kv.mu.Lock()
    args.KVs = kv.db.GetByShard(shard)
    gid := kv.config.Shards[shard]
    servers, ok := kv.config.Groups[gid]
    if !ok {
       kv.mu.Unlock()
       return
    }
    kv.mu.Unlock()

    for i := range servers {
       srv := kv.make_end(servers[i])
       DPrintf("[Server%d-%d] SendShard %d To %s", kv.gid, kv.me, shard, servers[i])
       ok = srv.Call("ShardKV.MigrateShard", &args, &reply)
       if !ok {
          DPrintf("[Server%d-%d] SendShard %d RPC failed", kv.gid, kv.me, shard)
          continue
       }
       switch reply.Err {
       case OK:
          DPrintf("[Server%d-%d] SendShard %d To %s Success", kv.gid, kv.me, shard, servers[i])
          op := Op{
             Type:      OpTypeRemoveShard,
             ClerkID:   args.ClerkId,
             ReqId:     args.ReqId,
             Shard:     shard,
             ConfigNum: configNum,
          }
          _, err := kv.opHandler(op)
          if err != OK {
             DPrintf("[Server%d-%d] handle Op failed, Err: %s", kv.gid, kv.me, err)
          } else {
             DPrintf("[Server%d-%d] SendShard %d Success", kv.gid, kv.me, shard)
          }
       case ErrWrongLeader:
          continue
       default:
          DPrintf("[Server%d-%d] SendShard %d failed,Err:%s", kv.gid, kv.me, shard, reply.Err)
       }
    }
}

分片监听

同理,我们启动一个协程,每50ms监听一次Shard状态,如果存在需要Sending的Shard,则主动发送RPC请求。

func (kv *ShardKV) ShardWatcher() {
    for !kv.killed() {
       time.Sleep(50 * time.Millisecond)
       if !kv.isLeader() {
          continue
       }

       kv.mu.Lock()
       if kv.config.Num == 0 {
          kv.mu.Unlock()
          continue
       }

       for shard, status := range kv.ShardsStatus {
          if status == ShardStatusSending {
             DPrintf("[Server%d-%d] ShardWatcher is Preparing to Send Shard", kv.gid, kv.me)
             kv.NextReqId++
             go kv.SendShard(shard, kv.config.Num, kv.applied.Copy(), kv.NextReqId)
          }
       }
       kv.mu.Unlock()
    }
}

易错点

  1. 只有所有Shards不存在处于Send的状态时,可以更新Config;
  2. 迁移分片后要保证幂等性;
  3. 旧配置的Server不允许处理新配置的请求。

© 2025 Bestzy's Blog

🌱 Powered by Hugo with theme Dream.

About Me

👋 Hi, This is Zheng Yi.