in pkg/controllers/updaterun/execution.go [87:222]
func (r *Reconciler) executeUpdatingStage(
ctx context.Context,
updateRun *placementv1beta1.ClusterStagedUpdateRun,
updatingStageIndex int,
toBeUpdatedBindings []*placementv1beta1.ClusterResourceBinding,
) (time.Duration, error) {
updatingStageStatus := &updateRun.Status.StagesStatus[updatingStageIndex]
// The parse error is ignored because the initialization should have caught it.
resourceIndex, _ := strconv.Atoi(updateRun.Spec.ResourceSnapshotIndex)
resourceSnapshotName := fmt.Sprintf(placementv1beta1.ResourceSnapshotNameFmt, updateRun.Spec.PlacementName, resourceIndex)
updateRunRef := klog.KObj(updateRun)
// Create the map of the toBeUpdatedBindings.
toBeUpdatedBindingsMap := make(map[string]*placementv1beta1.ClusterResourceBinding, len(toBeUpdatedBindings))
for _, binding := range toBeUpdatedBindings {
toBeUpdatedBindingsMap[binding.Spec.TargetCluster] = binding
}
finishedClusterCount := 0
// Go through each cluster in the stage and check if it's updated.
for i := range updatingStageStatus.Clusters {
clusterStatus := &updatingStageStatus.Clusters[i]
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.Generation) {
// The cluster is marked as failed to update.
failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
klog.ErrorS(failedErr, "The cluster has failed to be updated", "clusterStagedUpdateRun", updateRunRef)
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error())
}
if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.Generation) {
// The cluster has been updated successfully.
finishedClusterCount++
continue
}
// The cluster is either updating or not started yet.
binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName]
if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.Generation) {
// The cluster has not started updating yet.
if !isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus) {
klog.V(2).InfoS("Found the first cluster that needs to be updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
// The binding is not up-to-date with the cluster status.
binding.Spec.State = placementv1beta1.BindingStateBound
binding.Spec.ResourceSnapshotName = resourceSnapshotName
binding.Spec.ResourceOverrideSnapshots = clusterStatus.ResourceOverrideSnapshots
binding.Spec.ClusterResourceOverrideSnapshots = clusterStatus.ClusterResourceOverrideSnapshots
binding.Spec.ApplyStrategy = updateRun.Status.ApplyStrategy
if err := r.Client.Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to update binding to be bound with the matching spec of the updateRun", "binding", klog.KObj(binding), "clusterStagedUpdateRun", updateRunRef)
return 0, controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
}
} else {
klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if binding.Spec.State != placementv1beta1.BindingStateBound {
binding.Spec.State = placementv1beta1.BindingStateBound
if err := r.Client.Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to update a binding to be bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
return 0, controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
}
} else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
}
} else {
if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil {
return clusterUpdatingWaitTime, updateErr
}
}
}
markClusterUpdatingStarted(clusterStatus, updateRun.Generation)
if finishedClusterCount == 0 {
markStageUpdatingStarted(updatingStageStatus, updateRun.Generation)
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, nil
}
// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
if !isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound ||
!condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v, condition: %+v",
clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec, binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted))))
klog.ErrorS(unexpectedErr, "The binding has been changed during updating, please check if there's concurrent clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error())
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}
finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
if finished {
finishedClusterCount++
markUpdateRunProgressing(updateRun)
continue
} else {
// If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
if timeElapsed > updateRunStuckThreshold {
klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName)
}
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, updateErr
}
if finishedClusterCount == len(updatingStageStatus.Clusters) {
// All the clusters in the stage have been updated.
markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
markStageUpdatingWaiting(updatingStageStatus, updateRun.Generation)
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
// Check if the after stage tasks are ready.
approved, waitTime, err := r.checkAfterStageTasksStatus(ctx, updatingStageIndex, updateRun)
if err != nil {
return 0, err
}
if approved {
markUpdateRunProgressing(updateRun)
markStageUpdatingSucceeded(updatingStageStatus, updateRun.Generation)
// No need to wait to get to the next stage.
return 0, nil
}
// The after stage tasks are not ready yet.
if waitTime < 0 {
waitTime = stageUpdatingWaitTime
}
return waitTime, nil
}
return clusterUpdatingWaitTime, nil
}