in pkg/controllers/updaterun/controller.go [67:172]
func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtime.Result, error) {
startTime := time.Now()
klog.V(2).InfoS("ClusterStagedUpdateRun reconciliation starts", "clusterStagedUpdateRun", req.NamespacedName)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("ClusterStagedUpdateRun reconciliation ends", "clusterStagedUpdateRun", req.NamespacedName, "latency", latency)
}()
var updateRun placementv1beta1.ClusterStagedUpdateRun
if err := r.Client.Get(ctx, req.NamespacedName, &updateRun); err != nil {
klog.ErrorS(err, "Failed to get clusterStagedUpdateRun object", "clusterStagedUpdateRun", req.Name)
return runtime.Result{}, client.IgnoreNotFound(err)
}
runObjRef := klog.KObj(&updateRun)
// Handle the deletion of the clusterStagedUpdateRun.
if !updateRun.DeletionTimestamp.IsZero() {
klog.V(2).InfoS("The clusterStagedUpdateRun is being deleted", "clusterStagedUpdateRun", runObjRef)
deleted, waitTime, err := r.handleDelete(ctx, updateRun.DeepCopy())
if err != nil {
return runtime.Result{}, err
}
if deleted {
return runtime.Result{}, nil
}
return runtime.Result{RequeueAfter: waitTime}, nil
}
// Add the finalizer to the clusterStagedUpdateRun.
if err := r.ensureFinalizer(ctx, &updateRun); err != nil {
klog.ErrorS(err, "Failed to add the finalizer to the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, err
}
// Emit the update run status metric based on status conditions in the updateRun.
defer emitUpdateRunStatusMetric(&updateRun)
var updatingStageIndex int
var toBeUpdatedBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding
var err error
initCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
if !condition.IsConditionStatusTrue(initCond, updateRun.Generation) {
if condition.IsConditionStatusFalse(initCond, updateRun.Generation) {
klog.V(2).InfoS("The clusterStagedUpdateRun has failed to initialize", "errorMsg", initCond.Message, "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, nil
}
if toBeUpdatedBindings, toBeDeletedBindings, err = r.initialize(ctx, &updateRun); err != nil {
klog.ErrorS(err, "Failed to initialize the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef)
// errInitializedFailed cannot be retried.
if errors.Is(err, errInitializedFailed) {
return runtime.Result{}, r.recordInitializationFailed(ctx, &updateRun, err.Error())
}
return runtime.Result{}, err
}
updatingStageIndex = 0 // start from the first stage.
klog.V(2).InfoS("Initialized the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef)
} else {
klog.V(2).InfoS("The clusterStagedUpdateRun is initialized", "clusterStagedUpdateRun", runObjRef)
// Check if the clusterStagedUpdateRun is finished.
finishedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded))
if condition.IsConditionStatusTrue(finishedCond, updateRun.Generation) || condition.IsConditionStatusFalse(finishedCond, updateRun.Generation) {
klog.V(2).InfoS("The clusterStagedUpdateRun is finished", "finishedSuccessfully", finishedCond.Status, "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, nil
}
// Validate the clusterStagedUpdateRun status to ensure the update can be continued and get the updating stage index and cluster indices.
if updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings, err = r.validate(ctx, &updateRun); err != nil {
// errStagedUpdatedAborted cannot be retried.
if errors.Is(err, errStagedUpdatedAborted) {
return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, err.Error())
}
return runtime.Result{}, err
}
klog.V(2).InfoS("The clusterStagedUpdateRun is validated", "clusterStagedUpdateRun", runObjRef)
}
// The previous run is completed but the update to the status failed.
if updatingStageIndex == -1 {
klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun)
}
// Execute the updateRun.
klog.V(2).InfoS("Continue to execute the clusterStagedUpdateRun", "updatingStageIndex", updatingStageIndex, "clusterStagedUpdateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, &updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, execErr.Error())
}
if finished {
klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun)
}
// The execution is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, &updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}
klog.V(2).InfoS("The clusterStagedUpdateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "clusterStagedUpdateRun", runObjRef)
if execErr != nil {
return runtime.Result{}, execErr
}
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
}