in pkg/controllers/workv1alpha1/apply_controller.go [124:227]
func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if !r.joined.Load() {
klog.V(2).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
klog.InfoS("work apply controller reconcile loop triggered.", "work", req.NamespacedName)
// Fetch the work resource
work := &workv1alpha1.Work{}
err := r.client.Get(ctx, req.NamespacedName, work)
switch {
case apierrors.IsNotFound(err):
klog.V(2).InfoS("the work resource is deleted", "work", req.NamespacedName)
return ctrl.Result{}, nil
case err != nil:
klog.ErrorS(err, "failed to retrieve the work", "work", req.NamespacedName)
return ctrl.Result{}, err
}
logObjRef := klog.KObj(work)
// Handle deleting work, garbage collect the resources
if !work.DeletionTimestamp.IsZero() {
klog.V(2).InfoS("resource is in the process of being deleted", work.Kind, logObjRef)
return r.garbageCollectAppliedWork(ctx, work)
}
// ensure that the appliedWork and the finalizer exist
appliedWork, err := r.ensureAppliedWork(ctx, work)
if err != nil {
return ctrl.Result{}, err
}
owner := metav1.OwnerReference{
APIVersion: workv1alpha1.GroupVersion.String(),
Kind: workv1alpha1.AppliedWorkKind,
Name: appliedWork.GetName(),
UID: appliedWork.GetUID(),
BlockOwnerDeletion: ptr.To(false),
}
// apply the manifests to the member cluster
results := r.applyManifests(ctx, work.Spec.Workload.Manifests, owner)
// collect the latency from the work update time to now.
lastUpdateTime, ok := work.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey]
if ok {
workUpdateTime, parseErr := time.Parse(time.RFC3339, lastUpdateTime)
if parseErr != nil {
klog.ErrorS(parseErr, "failed to parse the last work update time", "work", logObjRef)
} else {
latency := time.Since(workUpdateTime)
metrics.WorkApplyTime.WithLabelValues(work.GetName()).Observe(latency.Seconds())
klog.V(2).InfoS("work is applied", "work", work.GetName(), "latency", latency.Milliseconds())
}
} else {
klog.V(2).InfoS("work has no last update time", "work", work.GetName())
}
// generate the work condition based on the manifest apply result
errs := r.generateWorkCondition(results, work)
// update the work status
if err = r.client.Status().Update(ctx, work, &client.SubResourceUpdateOptions{}); err != nil {
klog.ErrorS(err, "failed to update work status", "work", logObjRef)
return ctrl.Result{}, err
}
if len(errs) == 0 {
klog.InfoS("successfully applied the work to the cluster", "work", logObjRef)
r.recorder.Event(work, v1.EventTypeNormal, "ApplyWorkSucceed", "apply the work successfully")
}
// now we sync the status from work to appliedWork no matter if apply succeeds or not
newRes, staleRes, genErr := r.generateDiff(ctx, work, appliedWork)
if genErr != nil {
klog.ErrorS(err, "failed to generate the diff between work status and appliedWork status", work.Kind, logObjRef)
return ctrl.Result{}, err
}
// delete all the manifests that should not be in the cluster.
if err = r.deleteStaleManifest(ctx, staleRes, owner); err != nil {
klog.ErrorS(err, "resource garbage-collection incomplete; some Work owned resources could not be deleted", work.Kind, logObjRef)
// we can't proceed to update the applied
return ctrl.Result{}, err
} else if len(staleRes) > 0 {
klog.V(2).InfoS("successfully garbage-collected all stale manifests", work.Kind, logObjRef, "number of GCed res", len(staleRes))
for _, res := range staleRes {
klog.V(2).InfoS("successfully garbage-collected a stale manifest", work.Kind, logObjRef, "res", res)
}
}
// update the appliedWork with the new work after the stales are deleted
appliedWork.Status.AppliedResources = newRes
if err = r.spokeClient.Status().Update(ctx, appliedWork, &client.SubResourceUpdateOptions{}); err != nil {
klog.ErrorS(err, "failed to update appliedWork status", appliedWork.Kind, appliedWork.GetName())
return ctrl.Result{}, err
}
err = utilerrors.NewAggregate(errs)
if err != nil {
klog.ErrorS(err, "manifest apply incomplete; the message is queued again for reconciliation",
"work", logObjRef)
}
// we periodically reconcile the work to make sure the member cluster state is in sync with the work
// even if the reconciling succeeds in case the resources on the member cluster is removed/changed.
return ctrl.Result{RequeueAfter: time.Minute * 5}, err
}