in pkg/controllers/updaterun/initialization.go [276:392]
func (r *Reconciler) computeRunStageStatus(
ctx context.Context,
scheduledBindings []*placementv1beta1.ClusterResourceBinding,
updateRun *placementv1beta1.ClusterStagedUpdateRun,
) error {
updateRunRef := klog.KObj(updateRun)
updateStrategyName := updateRun.Spec.StagedUpdateStrategyName
// Map to track clusters and ensure they appear in one and only one stage.
allSelectedClusters := make(map[string]struct{}, len(scheduledBindings))
allPlacedClusters := make(map[string]struct{})
for _, binding := range scheduledBindings {
allSelectedClusters[binding.Spec.TargetCluster] = struct{}{}
}
stagesStatus := make([]placementv1beta1.StageUpdatingStatus, 0, len(updateRun.Status.StagedUpdateStrategySnapshot.Stages))
// Apply the label selectors from the ClusterStagedUpdateStrategy to filter the clusters.
for _, stage := range updateRun.Status.StagedUpdateStrategySnapshot.Stages {
if err := validateAfterStageTask(stage.AfterStageTasks); err != nil {
klog.ErrorS(err, "Failed to validate the after stage tasks", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
// no more retries here.
invalidAfterStageErr := controller.NewUserError(fmt.Errorf("the after stage tasks are invalid, clusterStagedUpdateStrategy: %s, stage: %s, err: %s", updateStrategyName, stage.Name, err.Error()))
return fmt.Errorf("%w: %s", errInitializedFailed, invalidAfterStageErr.Error())
}
curStageUpdatingStatus := placementv1beta1.StageUpdatingStatus{StageName: stage.Name}
var curStageClusters []clusterv1beta1.MemberCluster
labelSelector, err := metav1.LabelSelectorAsSelector(stage.LabelSelector)
if err != nil {
klog.ErrorS(err, "Failed to convert label selector", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "labelSelector", stage.LabelSelector, "clusterStagedUpdateRun", updateRunRef)
// no more retries here.
invalidLabelErr := controller.NewUserError(fmt.Errorf("the stage label selector is invalid, clusterStagedUpdateStrategy: %s, stage: %s, err: %s", updateStrategyName, stage.Name, err.Error()))
return fmt.Errorf("%w: %s", errInitializedFailed, invalidLabelErr.Error())
}
// List all the clusters that match the label selector.
var clusterList clusterv1beta1.MemberClusterList
if err := r.Client.List(ctx, &clusterList, &client.ListOptions{LabelSelector: labelSelector}); err != nil {
klog.ErrorS(err, "Failed to list clusters for the stage", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "labelSelector", stage.LabelSelector, "clusterStagedUpdateRun", updateRunRef)
// list err can be retried.
return controller.NewAPIServerError(true, err)
}
// Intersect the selected clusters with the clusters in the stage.
for _, cluster := range clusterList.Items {
if _, ok := allSelectedClusters[cluster.Name]; ok {
if _, ok := allPlacedClusters[cluster.Name]; ok {
// a cluster can only appear in one stage.
dupErr := controller.NewUserError(fmt.Errorf("cluster `%s` appears in more than one stages", cluster.Name))
klog.ErrorS(dupErr, "Failed to compute the stage", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
// no more retries here.
return fmt.Errorf("%w: %s", errInitializedFailed, dupErr.Error())
}
if stage.SortingLabelKey != nil {
// interpret the label values as integers.
if _, err := strconv.Atoi(cluster.Labels[*stage.SortingLabelKey]); err != nil {
keyErr := controller.NewUserError(fmt.Errorf("the sorting label `%s:%s` on cluster `%s` is not valid: %s", *stage.SortingLabelKey, cluster.Labels[*stage.SortingLabelKey], cluster.Name, err.Error()))
klog.ErrorS(keyErr, "Failed to sort clusters in the stage", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
// no more retries here.
return fmt.Errorf("%w: %s", errInitializedFailed, keyErr.Error())
}
}
curStageClusters = append(curStageClusters, cluster)
allPlacedClusters[cluster.Name] = struct{}{}
}
}
// Check if the stage is empty.
if len(curStageClusters) == 0 {
// since we allow no selected bindings, a stage can be empty.
klog.InfoS("No cluster is selected for the stage", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
} else {
// Sort the clusters in the stage based on the SortingLabelKey and cluster name.
sort.Slice(curStageClusters, func(i, j int) bool {
if stage.SortingLabelKey == nil {
return curStageClusters[i].Name < curStageClusters[j].Name
}
labelI, _ := strconv.Atoi(curStageClusters[i].Labels[*stage.SortingLabelKey])
labelJ, _ := strconv.Atoi(curStageClusters[j].Labels[*stage.SortingLabelKey])
if labelI != labelJ {
return labelI < labelJ
}
return curStageClusters[i].Name < curStageClusters[j].Name
})
}
// Record the clusters in the stage.
curStageUpdatingStatus.Clusters = make([]placementv1beta1.ClusterUpdatingStatus, len(curStageClusters))
for i, cluster := range curStageClusters {
klog.V(2).InfoS("Adding a cluster to the stage", "cluster", cluster.Name, "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "clusterStagedUpdateRun", updateRunRef)
curStageUpdatingStatus.Clusters[i].ClusterName = cluster.Name
}
// Create the after stage tasks.
curStageUpdatingStatus.AfterStageTaskStatus = make([]placementv1beta1.AfterStageTaskStatus, len(stage.AfterStageTasks))
for i, task := range stage.AfterStageTasks {
curStageUpdatingStatus.AfterStageTaskStatus[i].Type = task.Type
if task.Type == placementv1beta1.AfterStageTaskTypeApproval {
curStageUpdatingStatus.AfterStageTaskStatus[i].ApprovalRequestName = fmt.Sprintf(placementv1beta1.ApprovalTaskNameFmt, updateRun.Name, stage.Name)
}
}
stagesStatus = append(stagesStatus, curStageUpdatingStatus)
}
updateRun.Status.StagesStatus = stagesStatus
// Check if the clusters are all placed.
if len(allPlacedClusters) != len(allSelectedClusters) {
missingErr := controller.NewUserError(fmt.Errorf("some clusters are not placed in any stage"))
for cluster := range allSelectedClusters {
if _, ok := allPlacedClusters[cluster]; !ok {
klog.ErrorS(missingErr, "Cluster is missing in any stage", "cluster", cluster, "clusterStagedUpdateStrategy", updateStrategyName, "clusterStagedUpdateRun", updateRunRef)
}
}
// no more retries here.
return fmt.Errorf("%w: %s", errInitializedFailed, missingErr.Error())
}
return nil
}