func()

in pkg/controllers/workv1alpha1/apply_controller.go [124:227]


func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	if !r.joined.Load() {
		klog.V(2).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName)
		return ctrl.Result{RequeueAfter: time.Second * 5}, nil
	}
	klog.InfoS("work apply controller reconcile loop triggered.", "work", req.NamespacedName)

	// Fetch the work resource
	work := &workv1alpha1.Work{}
	err := r.client.Get(ctx, req.NamespacedName, work)
	switch {
	case apierrors.IsNotFound(err):
		klog.V(2).InfoS("the work resource is deleted", "work", req.NamespacedName)
		return ctrl.Result{}, nil
	case err != nil:
		klog.ErrorS(err, "failed to retrieve the work", "work", req.NamespacedName)
		return ctrl.Result{}, err
	}
	logObjRef := klog.KObj(work)

	// Handle deleting work, garbage collect the resources
	if !work.DeletionTimestamp.IsZero() {
		klog.V(2).InfoS("resource is in the process of being deleted", work.Kind, logObjRef)
		return r.garbageCollectAppliedWork(ctx, work)
	}

	// ensure that the appliedWork and the finalizer exist
	appliedWork, err := r.ensureAppliedWork(ctx, work)
	if err != nil {
		return ctrl.Result{}, err
	}
	owner := metav1.OwnerReference{
		APIVersion:         workv1alpha1.GroupVersion.String(),
		Kind:               workv1alpha1.AppliedWorkKind,
		Name:               appliedWork.GetName(),
		UID:                appliedWork.GetUID(),
		BlockOwnerDeletion: ptr.To(false),
	}

	// apply the manifests to the member cluster
	results := r.applyManifests(ctx, work.Spec.Workload.Manifests, owner)

	// collect the latency from the work update time to now.
	lastUpdateTime, ok := work.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey]
	if ok {
		workUpdateTime, parseErr := time.Parse(time.RFC3339, lastUpdateTime)
		if parseErr != nil {
			klog.ErrorS(parseErr, "failed to parse the last work update time", "work", logObjRef)
		} else {
			latency := time.Since(workUpdateTime)
			metrics.WorkApplyTime.WithLabelValues(work.GetName()).Observe(latency.Seconds())
			klog.V(2).InfoS("work is applied", "work", work.GetName(), "latency", latency.Milliseconds())
		}
	} else {
		klog.V(2).InfoS("work has no last update time", "work", work.GetName())
	}

	// generate the work condition based on the manifest apply result
	errs := r.generateWorkCondition(results, work)

	// update the work status
	if err = r.client.Status().Update(ctx, work, &client.SubResourceUpdateOptions{}); err != nil {
		klog.ErrorS(err, "failed to update work status", "work", logObjRef)
		return ctrl.Result{}, err
	}
	if len(errs) == 0 {
		klog.InfoS("successfully applied the work to the cluster", "work", logObjRef)
		r.recorder.Event(work, v1.EventTypeNormal, "ApplyWorkSucceed", "apply the work successfully")
	}

	// now we sync the status from work to appliedWork no matter if apply succeeds or not
	newRes, staleRes, genErr := r.generateDiff(ctx, work, appliedWork)
	if genErr != nil {
		klog.ErrorS(err, "failed to generate the diff between work status and appliedWork status", work.Kind, logObjRef)
		return ctrl.Result{}, err
	}
	// delete all the manifests that should not be in the cluster.
	if err = r.deleteStaleManifest(ctx, staleRes, owner); err != nil {
		klog.ErrorS(err, "resource garbage-collection incomplete; some Work owned resources could not be deleted", work.Kind, logObjRef)
		// we can't proceed to update the applied
		return ctrl.Result{}, err
	} else if len(staleRes) > 0 {
		klog.V(2).InfoS("successfully garbage-collected all stale manifests", work.Kind, logObjRef, "number of GCed res", len(staleRes))
		for _, res := range staleRes {
			klog.V(2).InfoS("successfully garbage-collected a stale manifest", work.Kind, logObjRef, "res", res)
		}
	}

	// update the appliedWork with the new work after the stales are deleted
	appliedWork.Status.AppliedResources = newRes
	if err = r.spokeClient.Status().Update(ctx, appliedWork, &client.SubResourceUpdateOptions{}); err != nil {
		klog.ErrorS(err, "failed to update appliedWork status", appliedWork.Kind, appliedWork.GetName())
		return ctrl.Result{}, err
	}
	err = utilerrors.NewAggregate(errs)
	if err != nil {
		klog.ErrorS(err, "manifest apply incomplete; the message is queued again for reconciliation",
			"work", logObjRef)
	}

	// we periodically reconcile the work to make sure the member cluster state is in sync with the work
	// even if the reconciling succeeds in case the resources on the member cluster is removed/changed.
	return ctrl.Result{RequeueAfter: time.Minute * 5}, err
}