in store/cluster.go [198:232]
func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetShardIdx int, slotOnly bool) error {
if targetShardIdx < 0 || targetShardIdx >= len(cluster.Shards) {
return consts.ErrIndexOutOfRange
}
sourceShardIdx, err := cluster.findShardIndexBySlot(slot)
if err != nil {
return err
}
if sourceShardIdx == targetShardIdx {
return consts.ErrShardIsSame
}
if slotOnly {
cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot)
cluster.Shards[targetShardIdx].SlotRanges = AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot)
return nil
}
if cluster.Shards[sourceShardIdx].IsMigrating() || cluster.Shards[targetShardIdx].IsMigrating() {
return consts.ErrShardSlotIsMigrating
}
// Send the migration command to the source node
sourceMasterNode := cluster.Shards[sourceShardIdx].GetMasterNode()
if sourceMasterNode == nil {
return consts.ErrNotFound
}
targetNodeID := cluster.Shards[targetShardIdx].GetMasterNode().ID()
if err := sourceMasterNode.MigrateSlot(ctx, slot, targetNodeID); err != nil {
return err
}
// Will start the data migration in the background
cluster.Shards[sourceShardIdx].MigratingSlot = &slot
cluster.Shards[sourceShardIdx].TargetShardIndex = targetShardIdx
return nil
}