func()

in pkg/controllers/updaterun/initialization.go [276:392]


func (r *Reconciler) computeRunStageStatus(
	ctx context.Context,
	scheduledBindings []*placementv1beta1.ClusterResourceBinding,
	updateRun *placementv1beta1.ClusterStagedUpdateRun,
) error {
	updateRunRef := klog.KObj(updateRun)
	updateStrategyName := updateRun.Spec.StagedUpdateStrategyName

	// Map to track clusters and ensure they appear in one and only one stage.
	allSelectedClusters := make(map[string]struct{}, len(scheduledBindings))
	allPlacedClusters := make(map[string]struct{})
	for _, binding := range scheduledBindings {
		allSelectedClusters[binding.Spec.TargetCluster] = struct{}{}
	}
	stagesStatus := make([]placementv1beta1.StageUpdatingStatus, 0, len(updateRun.Status.StagedUpdateStrategySnapshot.Stages))

	// Apply the label selectors from the ClusterStagedUpdateStrategy to filter the clusters.
	for _, stage := range updateRun.Status.StagedUpdateStrategySnapshot.Stages {
		if err := validateAfterStageTask(stage.AfterStageTasks); err != nil {
			klog.ErrorS(err, "Failed to validate the after stage tasks", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
			// no more retries here.
			invalidAfterStageErr := controller.NewUserError(fmt.Errorf("the after stage tasks are invalid, clusterStagedUpdateStrategy: %s, stage: %s, err: %s", updateStrategyName, stage.Name, err.Error()))
			return fmt.Errorf("%w: %s", errInitializedFailed, invalidAfterStageErr.Error())
		}

		curStageUpdatingStatus := placementv1beta1.StageUpdatingStatus{StageName: stage.Name}
		var curStageClusters []clusterv1beta1.MemberCluster
		labelSelector, err := metav1.LabelSelectorAsSelector(stage.LabelSelector)
		if err != nil {
			klog.ErrorS(err, "Failed to convert label selector", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "labelSelector", stage.LabelSelector, "clusterStagedUpdateRun", updateRunRef)
			// no more retries here.
			invalidLabelErr := controller.NewUserError(fmt.Errorf("the stage label selector is invalid, clusterStagedUpdateStrategy: %s, stage: %s, err: %s", updateStrategyName, stage.Name, err.Error()))
			return fmt.Errorf("%w: %s", errInitializedFailed, invalidLabelErr.Error())
		}
		// List all the clusters that match the label selector.
		var clusterList clusterv1beta1.MemberClusterList
		if err := r.Client.List(ctx, &clusterList, &client.ListOptions{LabelSelector: labelSelector}); err != nil {
			klog.ErrorS(err, "Failed to list clusters for the stage", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "labelSelector", stage.LabelSelector, "clusterStagedUpdateRun", updateRunRef)
			// list err can be retried.
			return controller.NewAPIServerError(true, err)
		}

		// Intersect the selected clusters with the clusters in the stage.
		for _, cluster := range clusterList.Items {
			if _, ok := allSelectedClusters[cluster.Name]; ok {
				if _, ok := allPlacedClusters[cluster.Name]; ok {
					// a cluster can only appear in one stage.
					dupErr := controller.NewUserError(fmt.Errorf("cluster `%s` appears in more than one stages", cluster.Name))
					klog.ErrorS(dupErr, "Failed to compute the stage", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
					// no more retries here.
					return fmt.Errorf("%w: %s", errInitializedFailed, dupErr.Error())
				}
				if stage.SortingLabelKey != nil {
					// interpret the label values as integers.
					if _, err := strconv.Atoi(cluster.Labels[*stage.SortingLabelKey]); err != nil {
						keyErr := controller.NewUserError(fmt.Errorf("the sorting label `%s:%s` on cluster `%s` is not valid: %s", *stage.SortingLabelKey, cluster.Labels[*stage.SortingLabelKey], cluster.Name, err.Error()))
						klog.ErrorS(keyErr, "Failed to sort clusters in the stage", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
						// no more retries here.
						return fmt.Errorf("%w: %s", errInitializedFailed, keyErr.Error())
					}
				}
				curStageClusters = append(curStageClusters, cluster)
				allPlacedClusters[cluster.Name] = struct{}{}
			}
		}

		// Check if the stage is empty.
		if len(curStageClusters) == 0 {
			// since we allow no selected bindings, a stage can be empty.
			klog.InfoS("No cluster is selected for the stage", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
		} else {
			// Sort the clusters in the stage based on the SortingLabelKey and cluster name.
			sort.Slice(curStageClusters, func(i, j int) bool {
				if stage.SortingLabelKey == nil {
					return curStageClusters[i].Name < curStageClusters[j].Name
				}
				labelI, _ := strconv.Atoi(curStageClusters[i].Labels[*stage.SortingLabelKey])
				labelJ, _ := strconv.Atoi(curStageClusters[j].Labels[*stage.SortingLabelKey])
				if labelI != labelJ {
					return labelI < labelJ
				}
				return curStageClusters[i].Name < curStageClusters[j].Name
			})
		}

		// Record the clusters in the stage.
		curStageUpdatingStatus.Clusters = make([]placementv1beta1.ClusterUpdatingStatus, len(curStageClusters))
		for i, cluster := range curStageClusters {
			klog.V(2).InfoS("Adding a cluster to the stage", "cluster", cluster.Name, "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
			curStageUpdatingStatus.Clusters[i].ClusterName = cluster.Name
		}

		// Create the after stage tasks.
		curStageUpdatingStatus.AfterStageTaskStatus = make([]placementv1beta1.AfterStageTaskStatus, len(stage.AfterStageTasks))
		for i, task := range stage.AfterStageTasks {
			curStageUpdatingStatus.AfterStageTaskStatus[i].Type = task.Type
			if task.Type == placementv1beta1.AfterStageTaskTypeApproval {
				curStageUpdatingStatus.AfterStageTaskStatus[i].ApprovalRequestName = fmt.Sprintf(placementv1beta1.ApprovalTaskNameFmt, updateRun.Name, stage.Name)
			}
		}
		stagesStatus = append(stagesStatus, curStageUpdatingStatus)
	}
	updateRun.Status.StagesStatus = stagesStatus

	// Check if the clusters are all placed.
	if len(allPlacedClusters) != len(allSelectedClusters) {
		missingErr := controller.NewUserError(fmt.Errorf("some clusters are not placed in any stage"))
		for cluster := range allSelectedClusters {
			if _, ok := allPlacedClusters[cluster]; !ok {
				klog.ErrorS(missingErr, "Cluster is missing in any stage", "cluster", cluster, "clusterStagedUpdateStrategy", updateStrategyName, "clusterStagedUpdateRun", updateRunRef)
			}
		}
		// no more retries here.
		return fmt.Errorf("%w: %s", errInitializedFailed, missingErr.Error())
	}
	return nil
}