in controller/cluster.go [312:380]
func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedCluster *store.Cluster) {
log := logger.Get().With(
zap.String("namespace", c.namespace),
zap.String("cluster", c.clusterName))
for i, shard := range clonedCluster.Shards {
if !shard.IsMigrating() {
continue
}
sourceNodeClusterInfo, err := shard.GetMasterNode().GetClusterInfo(ctx)
if err != nil {
log.Error("Failed to get the cluster info from the source node", zap.Error(err))
return
}
if sourceNodeClusterInfo.MigratingSlot == nil {
log.Error("The source migration slot is empty",
zap.String("migrating_slot", shard.MigratingSlot.String()),
)
return
}
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot) {
log.Error("Mismatch migrating slot",
zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
zap.String("migrating_slot", shard.MigratingSlot.String()),
)
return
}
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex))
return
}
switch sourceNodeClusterInfo.MigratingState {
case "none", "start":
continue
case "fail":
migratingSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
return
}
c.updateCluster(clonedCluster)
log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String()))
case "success":
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, *shard.MigratingSlot)
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, *shard.MigratingSlot,
)
migratedSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
} else {
log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String()))
}
c.updateCluster(clonedCluster)
default:
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
return
}
c.updateCluster(clonedCluster)
log.Error("Unknown migrating state", zap.String("state", sourceNodeClusterInfo.MigratingState))
}
}
}