func()

in pkg/controllers/updaterun/execution.go [87:222]


func (r *Reconciler) executeUpdatingStage(
	ctx context.Context,
	updateRun *placementv1beta1.ClusterStagedUpdateRun,
	updatingStageIndex int,
	toBeUpdatedBindings []*placementv1beta1.ClusterResourceBinding,
) (time.Duration, error) {
	updatingStageStatus := &updateRun.Status.StagesStatus[updatingStageIndex]
	// The parse error is ignored because the initialization should have caught it.
	resourceIndex, _ := strconv.Atoi(updateRun.Spec.ResourceSnapshotIndex)
	resourceSnapshotName := fmt.Sprintf(placementv1beta1.ResourceSnapshotNameFmt, updateRun.Spec.PlacementName, resourceIndex)
	updateRunRef := klog.KObj(updateRun)
	// Create the map of the toBeUpdatedBindings.
	toBeUpdatedBindingsMap := make(map[string]*placementv1beta1.ClusterResourceBinding, len(toBeUpdatedBindings))
	for _, binding := range toBeUpdatedBindings {
		toBeUpdatedBindingsMap[binding.Spec.TargetCluster] = binding
	}
	finishedClusterCount := 0

	// Go through each cluster in the stage and check if it's updated.
	for i := range updatingStageStatus.Clusters {
		clusterStatus := &updatingStageStatus.Clusters[i]
		clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
		clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
		if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.Generation) {
			// The cluster is marked as failed to update.
			failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
			klog.ErrorS(failedErr, "The cluster has failed to be updated", "clusterStagedUpdateRun", updateRunRef)
			return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error())
		}
		if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.Generation) {
			// The cluster has been updated successfully.
			finishedClusterCount++
			continue
		}
		// The cluster is either updating or not started yet.
		binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName]
		if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.Generation) {
			// The cluster has not started updating yet.
			if !isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus) {
				klog.V(2).InfoS("Found the first cluster that needs to be updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
				// The binding is not up-to-date with the cluster status.
				binding.Spec.State = placementv1beta1.BindingStateBound
				binding.Spec.ResourceSnapshotName = resourceSnapshotName
				binding.Spec.ResourceOverrideSnapshots = clusterStatus.ResourceOverrideSnapshots
				binding.Spec.ClusterResourceOverrideSnapshots = clusterStatus.ClusterResourceOverrideSnapshots
				binding.Spec.ApplyStrategy = updateRun.Status.ApplyStrategy
				if err := r.Client.Update(ctx, binding); err != nil {
					klog.ErrorS(err, "Failed to update binding to be bound with the matching spec of the updateRun", "binding", klog.KObj(binding), "clusterStagedUpdateRun", updateRunRef)
					return 0, controller.NewUpdateIgnoreConflictError(err)
				}
				klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
				if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
					return 0, err
				}
			} else {
				klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
				if binding.Spec.State != placementv1beta1.BindingStateBound {
					binding.Spec.State = placementv1beta1.BindingStateBound
					if err := r.Client.Update(ctx, binding); err != nil {
						klog.ErrorS(err, "Failed to update a binding to be bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
						return 0, controller.NewUpdateIgnoreConflictError(err)
					}
					klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
					if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
						return 0, err
					}
				} else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
					klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
					if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
						return 0, err
					}
				} else {
					if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil {
						return clusterUpdatingWaitTime, updateErr
					}
				}
			}
			markClusterUpdatingStarted(clusterStatus, updateRun.Generation)
			if finishedClusterCount == 0 {
				markStageUpdatingStarted(updatingStageStatus, updateRun.Generation)
			}
			// No need to continue as we only support one cluster updating at a time for now.
			return clusterUpdatingWaitTime, nil
		}

		// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
		if !isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound ||
			!condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
			unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v, condition: %+v",
				clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec, binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted))))
			klog.ErrorS(unexpectedErr, "The binding has been changed during updating, please check if there's concurrent clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
			markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error())
			return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
		}

		finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
		if finished {
			finishedClusterCount++
			markUpdateRunProgressing(updateRun)
			continue
		} else {
			// If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
			timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
			if timeElapsed > updateRunStuckThreshold {
				klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
				markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName)
			}
		}
		// No need to continue as we only support one cluster updating at a time for now.
		return clusterUpdatingWaitTime, updateErr
	}

	if finishedClusterCount == len(updatingStageStatus.Clusters) {
		// All the clusters in the stage have been updated.
		markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
		markStageUpdatingWaiting(updatingStageStatus, updateRun.Generation)
		klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
		// Check if the after stage tasks are ready.
		approved, waitTime, err := r.checkAfterStageTasksStatus(ctx, updatingStageIndex, updateRun)
		if err != nil {
			return 0, err
		}
		if approved {
			markUpdateRunProgressing(updateRun)
			markStageUpdatingSucceeded(updatingStageStatus, updateRun.Generation)
			// No need to wait to get to the next stage.
			return 0, nil
		}
		// The after stage tasks are not ready yet.
		if waitTime < 0 {
			waitTime = stageUpdatingWaitTime
		}
		return waitTime, nil
	}
	return clusterUpdatingWaitTime, nil
}