func()

in pkg/controllers/work/apply_controller.go [175:296]


func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	if !r.joined.Load() {
		klog.V(2).InfoS("Work controller is not started yet, requeue the request", "work", req.NamespacedName)
		return ctrl.Result{RequeueAfter: time.Second * 5}, nil
	}
	startTime := time.Now()
	klog.V(2).InfoS("ApplyWork reconciliation starts", "work", req.NamespacedName)
	defer func() {
		latency := time.Since(startTime).Milliseconds()
		klog.V(2).InfoS("ApplyWork reconciliation ends", "work", req.NamespacedName, "latency", latency)
	}()

	// Fetch the work resource
	work := &fleetv1beta1.Work{}
	err := r.client.Get(ctx, req.NamespacedName, work)
	switch {
	case apierrors.IsNotFound(err):
		klog.V(2).InfoS("The work resource is deleted", "work", req.NamespacedName)
		return ctrl.Result{}, nil
	case err != nil:
		klog.ErrorS(err, "Failed to retrieve the work", "work", req.NamespacedName)
		return ctrl.Result{}, controller.NewAPIServerError(true, err)
	}
	logObjRef := klog.KObj(work)

	// Handle deleting work, garbage collect the resources
	if !work.DeletionTimestamp.IsZero() {
		klog.V(2).InfoS("Resource is in the process of being deleted", work.Kind, logObjRef)
		return r.garbageCollectAppliedWork(ctx, work)
	}

	// set default value so that the following call can skip checking nil
	// TODO, could be removed once we have the defaulting webhook with fail policy.
	// Make sure these conditions are met before moving
	// * the defaulting webhook failure policy is configured as "fail".
	// * user cannot update/delete the webhook.
	defaulter.SetDefaultsWork(work)

	// ensure that the appliedWork and the finalizer exist
	appliedWork, err := r.ensureAppliedWork(ctx, work)
	if err != nil {
		return ctrl.Result{}, err
	}
	owner := metav1.OwnerReference{
		APIVersion:         fleetv1beta1.GroupVersion.String(),
		Kind:               fleetv1beta1.AppliedWorkKind,
		Name:               appliedWork.GetName(),
		UID:                appliedWork.GetUID(),
		BlockOwnerDeletion: ptr.To(false),
	}

	// apply the manifests to the member cluster
	results := r.applyManifests(ctx, work.Spec.Workload.Manifests, owner, work.Spec.ApplyStrategy)

	// collect the latency from the work update time to now.
	lastUpdateTime, ok := work.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey]
	if ok {
		workUpdateTime, parseErr := time.Parse(time.RFC3339, lastUpdateTime)
		if parseErr != nil {
			klog.ErrorS(parseErr, "Failed to parse the last work update time", "work", logObjRef)
		} else {
			latency := time.Since(workUpdateTime)
			metrics.WorkApplyTime.WithLabelValues(work.GetName()).Observe(latency.Seconds())
			klog.V(2).InfoS("Work is applied", "work", work.GetName(), "latency", latency.Milliseconds())
		}
	} else {
		klog.V(2).InfoS("Work has no last update time", "work", work.GetName())
	}

	// generate the work condition based on the manifest apply result
	errs := constructWorkCondition(results, work)

	// update the work status
	if err = r.client.Status().Update(ctx, work, &client.SubResourceUpdateOptions{}); err != nil {
		klog.ErrorS(err, "Failed to update work status", "work", logObjRef)
		return ctrl.Result{}, err
	}
	if len(errs) == 0 {
		klog.InfoS("Successfully applied the work to the cluster", "work", logObjRef)
		r.recorder.Event(work, v1.EventTypeNormal, "ApplyWorkSucceed", "apply the work successfully")
	}

	// now we sync the status from work to appliedWork no matter if apply succeeds or not
	newRes, staleRes, genErr := r.generateDiff(ctx, work, appliedWork)
	if genErr != nil {
		klog.ErrorS(err, "Failed to generate the diff between work status and appliedWork status", work.Kind, logObjRef)
		return ctrl.Result{}, err
	}
	// delete all the manifests that should not be in the cluster.
	if err = r.deleteStaleManifest(ctx, staleRes, owner); err != nil {
		klog.ErrorS(err, "Resource garbage-collection incomplete; some Work owned resources could not be deleted", work.Kind, logObjRef)
		// we can't proceed to update the applied
		return ctrl.Result{}, err
	} else if len(staleRes) > 0 {
		klog.V(2).InfoS("Successfully garbage-collected all stale manifests", work.Kind, logObjRef, "number of GCed res", len(staleRes))
		for _, res := range staleRes {
			klog.V(2).InfoS("Successfully garbage-collected a stale manifest", work.Kind, logObjRef, "res", res)
		}
	}
	// update the appliedWork with the new work after the stales are deleted
	appliedWork.Status.AppliedResources = newRes
	if err = r.spokeClient.Status().Update(ctx, appliedWork, &client.SubResourceUpdateOptions{}); err != nil {
		klog.ErrorS(err, "Failed to update appliedWork status", appliedWork.Kind, appliedWork.GetName())
		return ctrl.Result{}, err
	}

	// TODO: do not retry on errors if the apply action is reportDiff, report the diff every 1 min instead
	if err = utilerrors.NewAggregate(errs); err != nil {
		klog.ErrorS(err, "Manifest apply incomplete; the message is queued again for reconciliation",
			"work", logObjRef)
		return ctrl.Result{}, err
	}
	// check if the work is available, if not, we will requeue the work for reconciliation
	availableCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable)
	if !condition.IsConditionStatusTrue(availableCond, work.Generation) {
		klog.V(2).InfoS("Work is not available yet, check again", "work", logObjRef, "availableCond", availableCond)
		return ctrl.Result{RequeueAfter: time.Second * 3}, nil
	}
	// the work is available (might due to not trackable) but we still periodically reconcile to make sure the
	// member cluster state is in sync with the work in case the resources on the member cluster is removed/changed.
	return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}