func()

in pkg/controllers/updaterun/controller.go [67:172]


func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtime.Result, error) {
	startTime := time.Now()
	klog.V(2).InfoS("ClusterStagedUpdateRun reconciliation starts", "clusterStagedUpdateRun", req.NamespacedName)
	defer func() {
		latency := time.Since(startTime).Milliseconds()
		klog.V(2).InfoS("ClusterStagedUpdateRun reconciliation ends", "clusterStagedUpdateRun", req.NamespacedName, "latency", latency)
	}()

	var updateRun placementv1beta1.ClusterStagedUpdateRun
	if err := r.Client.Get(ctx, req.NamespacedName, &updateRun); err != nil {
		klog.ErrorS(err, "Failed to get clusterStagedUpdateRun object", "clusterStagedUpdateRun", req.Name)
		return runtime.Result{}, client.IgnoreNotFound(err)
	}
	runObjRef := klog.KObj(&updateRun)

	// Handle the deletion of the clusterStagedUpdateRun.
	if !updateRun.DeletionTimestamp.IsZero() {
		klog.V(2).InfoS("The clusterStagedUpdateRun is being deleted", "clusterStagedUpdateRun", runObjRef)
		deleted, waitTime, err := r.handleDelete(ctx, updateRun.DeepCopy())
		if err != nil {
			return runtime.Result{}, err
		}
		if deleted {
			return runtime.Result{}, nil
		}
		return runtime.Result{RequeueAfter: waitTime}, nil
	}

	// Add the finalizer to the clusterStagedUpdateRun.
	if err := r.ensureFinalizer(ctx, &updateRun); err != nil {
		klog.ErrorS(err, "Failed to add the finalizer to the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef)
		return runtime.Result{}, err
	}

	// Emit the update run status metric based on status conditions in the updateRun.
	defer emitUpdateRunStatusMetric(&updateRun)

	var updatingStageIndex int
	var toBeUpdatedBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding
	var err error
	initCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
	if !condition.IsConditionStatusTrue(initCond, updateRun.Generation) {
		if condition.IsConditionStatusFalse(initCond, updateRun.Generation) {
			klog.V(2).InfoS("The clusterStagedUpdateRun has failed to initialize", "errorMsg", initCond.Message, "clusterStagedUpdateRun", runObjRef)
			return runtime.Result{}, nil
		}
		if toBeUpdatedBindings, toBeDeletedBindings, err = r.initialize(ctx, &updateRun); err != nil {
			klog.ErrorS(err, "Failed to initialize the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef)
			// errInitializedFailed cannot be retried.
			if errors.Is(err, errInitializedFailed) {
				return runtime.Result{}, r.recordInitializationFailed(ctx, &updateRun, err.Error())
			}
			return runtime.Result{}, err
		}
		updatingStageIndex = 0 // start from the first stage.
		klog.V(2).InfoS("Initialized the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef)
	} else {
		klog.V(2).InfoS("The clusterStagedUpdateRun is initialized", "clusterStagedUpdateRun", runObjRef)
		// Check if the clusterStagedUpdateRun is finished.
		finishedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded))
		if condition.IsConditionStatusTrue(finishedCond, updateRun.Generation) || condition.IsConditionStatusFalse(finishedCond, updateRun.Generation) {
			klog.V(2).InfoS("The clusterStagedUpdateRun is finished", "finishedSuccessfully", finishedCond.Status, "clusterStagedUpdateRun", runObjRef)
			return runtime.Result{}, nil
		}

		// Validate the clusterStagedUpdateRun status to ensure the update can be continued and get the updating stage index and cluster indices.
		if updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings, err = r.validate(ctx, &updateRun); err != nil {
			// errStagedUpdatedAborted cannot be retried.
			if errors.Is(err, errStagedUpdatedAborted) {
				return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, err.Error())
			}
			return runtime.Result{}, err
		}
		klog.V(2).InfoS("The clusterStagedUpdateRun is validated", "clusterStagedUpdateRun", runObjRef)
	}

	// The previous run is completed but the update to the status failed.
	if updatingStageIndex == -1 {
		klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef)
		return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun)
	}

	// Execute the updateRun.
	klog.V(2).InfoS("Continue to execute the clusterStagedUpdateRun", "updatingStageIndex", updatingStageIndex, "clusterStagedUpdateRun", runObjRef)
	finished, waitTime, execErr := r.execute(ctx, &updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
	if errors.Is(execErr, errStagedUpdatedAborted) {
		// errStagedUpdatedAborted cannot be retried.
		return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, execErr.Error())
	}

	if finished {
		klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef)
		return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun)
	}

	// The execution is not finished yet or it encounters a retriable error.
	// We need to record the status and requeue.
	if updateErr := r.recordUpdateRunStatus(ctx, &updateRun); updateErr != nil {
		return runtime.Result{}, updateErr
	}
	klog.V(2).InfoS("The clusterStagedUpdateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "clusterStagedUpdateRun", runObjRef)
	if execErr != nil {
		return runtime.Result{}, execErr
	}
	return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
}