in pkg/controllers/work/apply_controller.go [175:296]
func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if !r.joined.Load() {
klog.V(2).InfoS("Work controller is not started yet, requeue the request", "work", req.NamespacedName)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
startTime := time.Now()
klog.V(2).InfoS("ApplyWork reconciliation starts", "work", req.NamespacedName)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("ApplyWork reconciliation ends", "work", req.NamespacedName, "latency", latency)
}()
// Fetch the work resource
work := &fleetv1beta1.Work{}
err := r.client.Get(ctx, req.NamespacedName, work)
switch {
case apierrors.IsNotFound(err):
klog.V(2).InfoS("The work resource is deleted", "work", req.NamespacedName)
return ctrl.Result{}, nil
case err != nil:
klog.ErrorS(err, "Failed to retrieve the work", "work", req.NamespacedName)
return ctrl.Result{}, controller.NewAPIServerError(true, err)
}
logObjRef := klog.KObj(work)
// Handle deleting work, garbage collect the resources
if !work.DeletionTimestamp.IsZero() {
klog.V(2).InfoS("Resource is in the process of being deleted", work.Kind, logObjRef)
return r.garbageCollectAppliedWork(ctx, work)
}
// set default value so that the following call can skip checking nil
// TODO, could be removed once we have the defaulting webhook with fail policy.
// Make sure these conditions are met before moving
// * the defaulting webhook failure policy is configured as "fail".
// * user cannot update/delete the webhook.
defaulter.SetDefaultsWork(work)
// ensure that the appliedWork and the finalizer exist
appliedWork, err := r.ensureAppliedWork(ctx, work)
if err != nil {
return ctrl.Result{}, err
}
owner := metav1.OwnerReference{
APIVersion: fleetv1beta1.GroupVersion.String(),
Kind: fleetv1beta1.AppliedWorkKind,
Name: appliedWork.GetName(),
UID: appliedWork.GetUID(),
BlockOwnerDeletion: ptr.To(false),
}
// apply the manifests to the member cluster
results := r.applyManifests(ctx, work.Spec.Workload.Manifests, owner, work.Spec.ApplyStrategy)
// collect the latency from the work update time to now.
lastUpdateTime, ok := work.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey]
if ok {
workUpdateTime, parseErr := time.Parse(time.RFC3339, lastUpdateTime)
if parseErr != nil {
klog.ErrorS(parseErr, "Failed to parse the last work update time", "work", logObjRef)
} else {
latency := time.Since(workUpdateTime)
metrics.WorkApplyTime.WithLabelValues(work.GetName()).Observe(latency.Seconds())
klog.V(2).InfoS("Work is applied", "work", work.GetName(), "latency", latency.Milliseconds())
}
} else {
klog.V(2).InfoS("Work has no last update time", "work", work.GetName())
}
// generate the work condition based on the manifest apply result
errs := constructWorkCondition(results, work)
// update the work status
if err = r.client.Status().Update(ctx, work, &client.SubResourceUpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to update work status", "work", logObjRef)
return ctrl.Result{}, err
}
if len(errs) == 0 {
klog.InfoS("Successfully applied the work to the cluster", "work", logObjRef)
r.recorder.Event(work, v1.EventTypeNormal, "ApplyWorkSucceed", "apply the work successfully")
}
// now we sync the status from work to appliedWork no matter if apply succeeds or not
newRes, staleRes, genErr := r.generateDiff(ctx, work, appliedWork)
if genErr != nil {
klog.ErrorS(err, "Failed to generate the diff between work status and appliedWork status", work.Kind, logObjRef)
return ctrl.Result{}, err
}
// delete all the manifests that should not be in the cluster.
if err = r.deleteStaleManifest(ctx, staleRes, owner); err != nil {
klog.ErrorS(err, "Resource garbage-collection incomplete; some Work owned resources could not be deleted", work.Kind, logObjRef)
// we can't proceed to update the applied
return ctrl.Result{}, err
} else if len(staleRes) > 0 {
klog.V(2).InfoS("Successfully garbage-collected all stale manifests", work.Kind, logObjRef, "number of GCed res", len(staleRes))
for _, res := range staleRes {
klog.V(2).InfoS("Successfully garbage-collected a stale manifest", work.Kind, logObjRef, "res", res)
}
}
// update the appliedWork with the new work after the stales are deleted
appliedWork.Status.AppliedResources = newRes
if err = r.spokeClient.Status().Update(ctx, appliedWork, &client.SubResourceUpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to update appliedWork status", appliedWork.Kind, appliedWork.GetName())
return ctrl.Result{}, err
}
// TODO: do not retry on errors if the apply action is reportDiff, report the diff every 1 min instead
if err = utilerrors.NewAggregate(errs); err != nil {
klog.ErrorS(err, "Manifest apply incomplete; the message is queued again for reconciliation",
"work", logObjRef)
return ctrl.Result{}, err
}
// check if the work is available, if not, we will requeue the work for reconciliation
availableCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable)
if !condition.IsConditionStatusTrue(availableCond, work.Generation) {
klog.V(2).InfoS("Work is not available yet, check again", "work", logObjRef, "availableCond", availableCond)
return ctrl.Result{RequeueAfter: time.Second * 3}, nil
}
// the work is available (might due to not trackable) but we still periodically reconcile to make sure the
// member cluster state is in sync with the work in case the resources on the member cluster is removed/changed.
return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}