func()

in controller/cluster.go [312:380]


func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedCluster *store.Cluster) {
	log := logger.Get().With(
		zap.String("namespace", c.namespace),
		zap.String("cluster", c.clusterName))

	for i, shard := range clonedCluster.Shards {
		if !shard.IsMigrating() {
			continue
		}

		sourceNodeClusterInfo, err := shard.GetMasterNode().GetClusterInfo(ctx)
		if err != nil {
			log.Error("Failed to get the cluster info from the source node", zap.Error(err))
			return
		}
		if sourceNodeClusterInfo.MigratingSlot == nil {
			log.Error("The source migration slot is empty",
				zap.String("migrating_slot", shard.MigratingSlot.String()),
			)
			return
		}
		if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot) {
			log.Error("Mismatch migrating slot",
				zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
				zap.String("migrating_slot", shard.MigratingSlot.String()),
			)
			return
		}
		if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
			log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex))
			return
		}

		switch sourceNodeClusterInfo.MigratingState {
		case "none", "start":
			continue
		case "fail":
			migratingSlot := shard.MigratingSlot
			clonedCluster.Shards[i].ClearMigrateState()
			if err := c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster); err != nil {
				log.Error("Failed to update the cluster", zap.Error(err))
				return
			}
			c.updateCluster(clonedCluster)
			log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String()))
		case "success":
			clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, *shard.MigratingSlot)
			clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
				clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, *shard.MigratingSlot,
			)
			migratedSlot := shard.MigratingSlot
			clonedCluster.Shards[i].ClearMigrateState()
			if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
				log.Error("Failed to update the cluster", zap.Error(err))
			} else {
				log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String()))
			}
			c.updateCluster(clonedCluster)
		default:
			clonedCluster.Shards[i].ClearMigrateState()
			if err := c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster); err != nil {
				log.Error("Failed to update the cluster", zap.Error(err))
				return
			}
			c.updateCluster(clonedCluster)
			log.Error("Unknown migrating state", zap.String("state", sourceNodeClusterInfo.MigratingState))
		}
	}
}