pkg/controllers/work/apply_controller.go (735 lines of code) (raw):

/* Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ /* Copyright 2025 The KubeFleet Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package work import ( "context" "fmt" "time" "go.uber.org/atomic" appv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" ctrloption "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/metrics" "go.goms.io/fleet/pkg/utils" "go.goms.io/fleet/pkg/utils/condition" "go.goms.io/fleet/pkg/utils/controller" "go.goms.io/fleet/pkg/utils/defaulter" "go.goms.io/fleet/pkg/utils/resource" ) const ( workFieldManagerName = "work-api-agent" ) // WorkCondition condition reasons const ( workAppliedFailedReason = "WorkAppliedFailed" workAppliedCompletedReason = "WorkAppliedCompleted" workNotAvailableYetReason = "WorkNotAvailableYet" workAvailabilityUnknownReason = "WorkAvailabilityUnknown" // WorkAvailableReason is the reason string of condition when the manifest is available. WorkAvailableReason = "WorkAvailable" // WorkNotTrackableReason is the reason string of condition when the manifest is already up to date but we don't have // a way to track its availabilities. WorkNotTrackableReason = "WorkNotTrackable" // ManifestApplyFailedReason is the reason string of condition when it failed to apply manifest. ManifestApplyFailedReason = "ManifestApplyFailed" // ApplyConflictBetweenPlacementsReason is the reason string of condition when the manifest is owned by multiple placements, // and they have conflicts. ApplyConflictBetweenPlacementsReason = "ApplyConflictBetweenPlacements" // ManifestsAlreadyOwnedByOthersReason is the reason string of condition when the manifest is already owned by other // non-fleet appliers. ManifestsAlreadyOwnedByOthersReason = "ManifestsAlreadyOwnedByOthers" // ManifestAlreadyUpToDateReason is the reason string of condition when the manifest is already up to date. ManifestAlreadyUpToDateReason = "ManifestAlreadyUpToDate" manifestAlreadyUpToDateMessage = "Manifest is already up to date" // ManifestNeedsUpdateReason is the reason string of condition when the manifest needs to be updated. ManifestNeedsUpdateReason = "ManifestNeedsUpdate" manifestNeedsUpdateMessage = "Manifest has just been updated and in the processing of checking its availability" ) // ApplyWorkReconciler reconciles a Work object type ApplyWorkReconciler struct { client client.Client spokeDynamicClient dynamic.Interface spokeClient client.Client restMapper meta.RESTMapper recorder record.EventRecorder concurrency int workNameSpace string joined *atomic.Bool appliers map[fleetv1beta1.ApplyStrategyType]Applier } func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.Interface, spokeClient client.Client, restMapper meta.RESTMapper, recorder record.EventRecorder, concurrency int, workNameSpace string) *ApplyWorkReconciler { return &ApplyWorkReconciler{ client: hubClient, spokeDynamicClient: spokeDynamicClient, spokeClient: spokeClient, restMapper: restMapper, recorder: recorder, concurrency: concurrency, workNameSpace: workNameSpace, joined: atomic.NewBool(false), } } // ApplyAction represents the action we take to apply the manifest. // It is used only internally to track the result of the apply function. // +enum type ApplyAction string const ( // manifestCreatedAction indicates that we created the manifest for the first time. manifestCreatedAction ApplyAction = "ManifestCreated" // manifestThreeWayMergePatchAction indicates that we updated the manifest using three-way merge patch. manifestThreeWayMergePatchAction ApplyAction = "ManifestThreeWayMergePatched" // manifestServerSideAppliedAction indicates that we updated the manifest using server side apply. manifestServerSideAppliedAction ApplyAction = "ManifestServerSideApplied" // errorApplyAction indicates that there was an error during the apply action. errorApplyAction ApplyAction = "ErrorApply" // applyConflictBetweenPlacements indicates that it fails to apply the manifest as it's owned by multiple placements, // and they have conflict apply strategy. applyConflictBetweenPlacements ApplyAction = "ApplyConflictBetweenPlacements" // manifestAlreadyOwnedByOthers indicates that the manifest is already owned by other non-fleet applier. manifestAlreadyOwnedByOthers ApplyAction = "ManifestAlreadyOwnedByOthers" // manifestNotAvailableYetAction indicates that we still need to wait for the manifest to be available. manifestNotAvailableYetAction ApplyAction = "ManifestNotAvailableYet" // manifestNotTrackableAction indicates that the manifest is already up to date but we don't have a way to track its availabilities. manifestNotTrackableAction ApplyAction = "ManifestNotTrackable" // manifestAvailableAction indicates that the manifest is available. manifestAvailableAction ApplyAction = "ManifestAvailable" ) // applyResult contains the result of a manifest being applied. type applyResult struct { identifier fleetv1beta1.WorkResourceIdentifier generation int64 action ApplyAction applyErr error } // Reconcile implement the control loop logic for Work object. func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { if !r.joined.Load() { klog.V(2).InfoS("Work controller is not started yet, requeue the request", "work", req.NamespacedName) return ctrl.Result{RequeueAfter: time.Second * 5}, nil } startTime := time.Now() klog.V(2).InfoS("ApplyWork reconciliation starts", "work", req.NamespacedName) defer func() { latency := time.Since(startTime).Milliseconds() klog.V(2).InfoS("ApplyWork reconciliation ends", "work", req.NamespacedName, "latency", latency) }() // Fetch the work resource work := &fleetv1beta1.Work{} err := r.client.Get(ctx, req.NamespacedName, work) switch { case apierrors.IsNotFound(err): klog.V(2).InfoS("The work resource is deleted", "work", req.NamespacedName) return ctrl.Result{}, nil case err != nil: klog.ErrorS(err, "Failed to retrieve the work", "work", req.NamespacedName) return ctrl.Result{}, controller.NewAPIServerError(true, err) } logObjRef := klog.KObj(work) // Handle deleting work, garbage collect the resources if !work.DeletionTimestamp.IsZero() { klog.V(2).InfoS("Resource is in the process of being deleted", work.Kind, logObjRef) return r.garbageCollectAppliedWork(ctx, work) } // set default value so that the following call can skip checking nil // TODO, could be removed once we have the defaulting webhook with fail policy. // Make sure these conditions are met before moving // * the defaulting webhook failure policy is configured as "fail". // * user cannot update/delete the webhook. defaulter.SetDefaultsWork(work) // ensure that the appliedWork and the finalizer exist appliedWork, err := r.ensureAppliedWork(ctx, work) if err != nil { return ctrl.Result{}, err } owner := metav1.OwnerReference{ APIVersion: fleetv1beta1.GroupVersion.String(), Kind: fleetv1beta1.AppliedWorkKind, Name: appliedWork.GetName(), UID: appliedWork.GetUID(), BlockOwnerDeletion: ptr.To(false), } // apply the manifests to the member cluster results := r.applyManifests(ctx, work.Spec.Workload.Manifests, owner, work.Spec.ApplyStrategy) // collect the latency from the work update time to now. lastUpdateTime, ok := work.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey] if ok { workUpdateTime, parseErr := time.Parse(time.RFC3339, lastUpdateTime) if parseErr != nil { klog.ErrorS(parseErr, "Failed to parse the last work update time", "work", logObjRef) } else { latency := time.Since(workUpdateTime) metrics.WorkApplyTime.WithLabelValues(work.GetName()).Observe(latency.Seconds()) klog.V(2).InfoS("Work is applied", "work", work.GetName(), "latency", latency.Milliseconds()) } } else { klog.V(2).InfoS("Work has no last update time", "work", work.GetName()) } // generate the work condition based on the manifest apply result errs := constructWorkCondition(results, work) // update the work status if err = r.client.Status().Update(ctx, work, &client.SubResourceUpdateOptions{}); err != nil { klog.ErrorS(err, "Failed to update work status", "work", logObjRef) return ctrl.Result{}, err } if len(errs) == 0 { klog.InfoS("Successfully applied the work to the cluster", "work", logObjRef) r.recorder.Event(work, v1.EventTypeNormal, "ApplyWorkSucceed", "apply the work successfully") } // now we sync the status from work to appliedWork no matter if apply succeeds or not newRes, staleRes, genErr := r.generateDiff(ctx, work, appliedWork) if genErr != nil { klog.ErrorS(err, "Failed to generate the diff between work status and appliedWork status", work.Kind, logObjRef) return ctrl.Result{}, err } // delete all the manifests that should not be in the cluster. if err = r.deleteStaleManifest(ctx, staleRes, owner); err != nil { klog.ErrorS(err, "Resource garbage-collection incomplete; some Work owned resources could not be deleted", work.Kind, logObjRef) // we can't proceed to update the applied return ctrl.Result{}, err } else if len(staleRes) > 0 { klog.V(2).InfoS("Successfully garbage-collected all stale manifests", work.Kind, logObjRef, "number of GCed res", len(staleRes)) for _, res := range staleRes { klog.V(2).InfoS("Successfully garbage-collected a stale manifest", work.Kind, logObjRef, "res", res) } } // update the appliedWork with the new work after the stales are deleted appliedWork.Status.AppliedResources = newRes if err = r.spokeClient.Status().Update(ctx, appliedWork, &client.SubResourceUpdateOptions{}); err != nil { klog.ErrorS(err, "Failed to update appliedWork status", appliedWork.Kind, appliedWork.GetName()) return ctrl.Result{}, err } // TODO: do not retry on errors if the apply action is reportDiff, report the diff every 1 min instead if err = utilerrors.NewAggregate(errs); err != nil { klog.ErrorS(err, "Manifest apply incomplete; the message is queued again for reconciliation", "work", logObjRef) return ctrl.Result{}, err } // check if the work is available, if not, we will requeue the work for reconciliation availableCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable) if !condition.IsConditionStatusTrue(availableCond, work.Generation) { klog.V(2).InfoS("Work is not available yet, check again", "work", logObjRef, "availableCond", availableCond) return ctrl.Result{RequeueAfter: time.Second * 3}, nil } // the work is available (might due to not trackable) but we still periodically reconcile to make sure the // member cluster state is in sync with the work in case the resources on the member cluster is removed/changed. return ctrl.Result{RequeueAfter: time.Minute * 5}, nil } // garbageCollectAppliedWork deletes the appliedWork and all the manifests associated with it from the cluster. func (r *ApplyWorkReconciler) garbageCollectAppliedWork(ctx context.Context, work *fleetv1beta1.Work) (ctrl.Result, error) { deletePolicy := metav1.DeletePropagationBackground if !controllerutil.ContainsFinalizer(work, fleetv1beta1.WorkFinalizer) { return ctrl.Result{}, nil } // delete the appliedWork which will remove all the manifests associated with it // TODO: allow orphaned manifest appliedWork := fleetv1beta1.AppliedWork{ ObjectMeta: metav1.ObjectMeta{Name: work.Name}, } err := r.spokeClient.Delete(ctx, &appliedWork, &client.DeleteOptions{PropagationPolicy: &deletePolicy}) switch { case apierrors.IsNotFound(err): klog.V(2).InfoS("The appliedWork is already deleted", "appliedWork", work.Name) case err != nil: klog.ErrorS(err, "Failed to delete the appliedWork", "appliedWork", work.Name) return ctrl.Result{}, err default: klog.InfoS("Successfully deleted the appliedWork", "appliedWork", work.Name) } controllerutil.RemoveFinalizer(work, fleetv1beta1.WorkFinalizer) return ctrl.Result{}, r.client.Update(ctx, work, &client.UpdateOptions{}) } // ensureAppliedWork makes sure that an associated appliedWork and a finalizer on the work resource exists on the cluster. func (r *ApplyWorkReconciler) ensureAppliedWork(ctx context.Context, work *fleetv1beta1.Work) (*fleetv1beta1.AppliedWork, error) { workRef := klog.KObj(work) appliedWork := &fleetv1beta1.AppliedWork{} hasFinalizer := false if controllerutil.ContainsFinalizer(work, fleetv1beta1.WorkFinalizer) { hasFinalizer = true err := r.spokeClient.Get(ctx, types.NamespacedName{Name: work.Name}, appliedWork) switch { case apierrors.IsNotFound(err): klog.ErrorS(err, "AppliedWork finalizer resource does not exist even with the finalizer, it will be recreated", "appliedWork", workRef.Name) case err != nil: klog.ErrorS(err, "Failed to retrieve the appliedWork ", "appliedWork", workRef.Name) return nil, controller.NewAPIServerError(true, err) default: return appliedWork, nil } } // we create the appliedWork before setting the finalizer, so it should always exist unless it's deleted behind our back appliedWork = &fleetv1beta1.AppliedWork{ ObjectMeta: metav1.ObjectMeta{ Name: work.Name, }, Spec: fleetv1beta1.AppliedWorkSpec{ WorkName: work.Name, WorkNamespace: work.Namespace, }, } if err := r.spokeClient.Create(ctx, appliedWork); err != nil && !apierrors.IsAlreadyExists(err) { klog.ErrorS(err, "AppliedWork create failed", "appliedWork", workRef.Name) return nil, err } if !hasFinalizer { klog.InfoS("Add the finalizer to the work", "work", workRef) work.Finalizers = append(work.Finalizers, fleetv1beta1.WorkFinalizer) return appliedWork, r.client.Update(ctx, work, &client.UpdateOptions{}) } klog.InfoS("Recreated the appliedWork resource", "appliedWork", workRef.Name) return appliedWork, nil } // applyManifests processes a given set of Manifests by: setting ownership, validating the manifest, and passing it on for application to the cluster. func (r *ApplyWorkReconciler) applyManifests(ctx context.Context, manifests []fleetv1beta1.Manifest, owner metav1.OwnerReference, applyStrategy *fleetv1beta1.ApplyStrategy) []applyResult { var appliedObj *unstructured.Unstructured results := make([]applyResult, len(manifests)) for index, manifest := range manifests { var result applyResult gvr, rawObj, err := r.decodeManifest(manifest) switch { case err != nil: result.applyErr = err result.identifier = fleetv1beta1.WorkResourceIdentifier{ Ordinal: index, } if rawObj != nil { result.identifier.Group = rawObj.GroupVersionKind().Group result.identifier.Version = rawObj.GroupVersionKind().Version result.identifier.Kind = rawObj.GroupVersionKind().Kind result.identifier.Namespace = rawObj.GetNamespace() result.identifier.Name = rawObj.GetName() } default: addOwnerRef(owner, rawObj) appliedObj, result.action, result.applyErr = r.applyUnstructuredAndTrackAvailability(ctx, gvr, rawObj, applyStrategy) result.identifier = buildResourceIdentifier(index, rawObj, gvr) logObjRef := klog.ObjectRef{ Name: result.identifier.Name, Namespace: result.identifier.Namespace, } if result.applyErr == nil { result.generation = appliedObj.GetGeneration() klog.V(2).InfoS("Apply manifest succeeded", "gvr", gvr, "manifest", logObjRef, "action", result.action, "applyStrategy", applyStrategy, "new ObservedGeneration", result.generation) } else { klog.ErrorS(result.applyErr, "manifest upsert failed", "gvr", gvr, "manifest", logObjRef) } } results[index] = result } return results } // Decodes the manifest into usable structs. func (r *ApplyWorkReconciler) decodeManifest(manifest fleetv1beta1.Manifest) (schema.GroupVersionResource, *unstructured.Unstructured, error) { unstructuredObj := &unstructured.Unstructured{} err := unstructuredObj.UnmarshalJSON(manifest.Raw) if err != nil { return schema.GroupVersionResource{}, nil, fmt.Errorf("failed to decode object: %w", err) } mapping, err := r.restMapper.RESTMapping(unstructuredObj.GroupVersionKind().GroupKind(), unstructuredObj.GroupVersionKind().Version) if err != nil { return schema.GroupVersionResource{}, unstructuredObj, fmt.Errorf("failed to find group/version/resource from restmapping: %w", err) } return mapping.Resource, unstructuredObj, nil } // applyUnstructuredAndTrackAvailability determines if an unstructured manifest object can & should be applied. It first validates // the size of the last modified annotation of the manifest, it removes the annotation if the size crosses the annotation size threshold // and then creates/updates the resource on the cluster using server side apply instead of three-way merge patch. func (r *ApplyWorkReconciler) applyUnstructuredAndTrackAvailability(ctx context.Context, gvr schema.GroupVersionResource, manifestObj *unstructured.Unstructured, applyStrategy *fleetv1beta1.ApplyStrategy) (*unstructured.Unstructured, ApplyAction, error) { // TODO: determine action based on conflict resolution action objManifest := klog.KObj(manifestObj) applier := r.appliers[applyStrategy.Type] if applier == nil { err := fmt.Errorf("unknown apply strategy type %s", applyStrategy.Type) klog.ErrorS(err, "Apply strategy type is unsupported", "gvr", gvr, "manifest", objManifest, "applyStrategyType", applyStrategy.Type) return nil, errorApplyAction, controller.NewUserError(err) } curObj, applyActionRes, err := applier.ApplyUnstructured(ctx, applyStrategy, gvr, manifestObj) if err != nil { klog.ErrorS(err, "Failed to apply the manifest", "gvr", gvr, "manifest", objManifest, "applyStrategyType", applyStrategy.Type) return nil, applyActionRes, err // do not overwrite the applyActionRes } klog.V(2).InfoS("Applied the manifest", "gvr", gvr, "manifest", objManifest, "applyStrategyType", applyStrategy.Type) // the manifest is already up to date, we just need to track its availability applyActionRes, err = trackResourceAvailability(gvr, curObj) return curObj, applyActionRes, err } func trackResourceAvailability(gvr schema.GroupVersionResource, curObj *unstructured.Unstructured) (ApplyAction, error) { switch gvr { case utils.DeploymentGVR: return trackDeploymentAvailability(curObj) case utils.StatefulSetGVR: return trackStatefulSetAvailability(curObj) case utils.DaemonSetGVR: return trackDaemonSetAvailability(curObj) case utils.ServiceGVR: return trackServiceAvailability(curObj) case utils.CustomResourceDefinitionGVR: return trackCRDAvailability(curObj) default: if isDataResource(gvr) { klog.V(2).InfoS("Data resources are available immediately", "gvr", gvr, "resource", klog.KObj(curObj)) return manifestAvailableAction, nil } klog.V(2).InfoS("We don't know how to track the availability of the resource", "gvr", gvr, "resource", klog.KObj(curObj)) return manifestNotTrackableAction, nil } } func trackCRDAvailability(curObj *unstructured.Unstructured) (ApplyAction, error) { var crd apiextensionsv1.CustomResourceDefinition if err := runtime.DefaultUnstructuredConverter.FromUnstructured(curObj.Object, &crd); err != nil { return errorApplyAction, controller.NewUnexpectedBehaviorError(err) } // If both conditions are True, the CRD is available if apiextensionshelpers.IsCRDConditionTrue(&crd, apiextensionsv1.Established) && apiextensionshelpers.IsCRDConditionTrue(&crd, apiextensionsv1.NamesAccepted) { klog.V(2).InfoS("CustomResourceDefinition is available", "customResourceDefinition", klog.KObj(curObj)) return manifestAvailableAction, nil } klog.V(2).InfoS("Still need to wait for CustomResourceDefinition to be available", "customResourceDefinition", klog.KObj(curObj)) return manifestNotAvailableYetAction, nil } func trackDeploymentAvailability(curObj *unstructured.Unstructured) (ApplyAction, error) { var deployment appv1.Deployment if err := runtime.DefaultUnstructuredConverter.FromUnstructured(curObj.Object, &deployment); err != nil { return errorApplyAction, controller.NewUnexpectedBehaviorError(err) } requiredReplicas := int32(1) if deployment.Spec.Replicas != nil { requiredReplicas = *deployment.Spec.Replicas } if deployment.Status.ObservedGeneration == deployment.Generation && requiredReplicas == deployment.Status.AvailableReplicas && requiredReplicas == deployment.Status.UpdatedReplicas { klog.V(2).InfoS("Deployment is available", "deployment", klog.KObj(curObj)) return manifestAvailableAction, nil } klog.V(2).InfoS("Still need to wait for deployment to be available", "deployment", klog.KObj(curObj)) return manifestNotAvailableYetAction, nil } func trackStatefulSetAvailability(curObj *unstructured.Unstructured) (ApplyAction, error) { var statefulSet appv1.StatefulSet if err := runtime.DefaultUnstructuredConverter.FromUnstructured(curObj.Object, &statefulSet); err != nil { return errorApplyAction, controller.NewUnexpectedBehaviorError(err) } // a statefulSet is available if all the replicas are available and the currentReplicas is equal to the updatedReplicas // which means there is no more update in progress. requiredReplicas := int32(1) if statefulSet.Spec.Replicas != nil { requiredReplicas = *statefulSet.Spec.Replicas } if statefulSet.Status.ObservedGeneration == statefulSet.Generation && statefulSet.Status.AvailableReplicas == requiredReplicas && statefulSet.Status.CurrentReplicas == statefulSet.Status.UpdatedReplicas && statefulSet.Status.CurrentRevision == statefulSet.Status.UpdateRevision { klog.V(2).InfoS("StatefulSet is available", "statefulSet", klog.KObj(curObj)) return manifestAvailableAction, nil } klog.V(2).InfoS("Still need to wait for statefulSet to be available", "statefulSet", klog.KObj(curObj)) return manifestNotAvailableYetAction, nil } func trackDaemonSetAvailability(curObj *unstructured.Unstructured) (ApplyAction, error) { var daemonSet appv1.DaemonSet if err := runtime.DefaultUnstructuredConverter.FromUnstructured(curObj.Object, &daemonSet); err != nil { return errorApplyAction, controller.NewUnexpectedBehaviorError(err) } // a daemonSet is available if all the desired replicas (equal to all node suit for this Daemonset) // are available and the currentReplicas is equal to the updatedReplicas which means there is no more update in progress. if daemonSet.Status.ObservedGeneration == daemonSet.Generation && daemonSet.Status.NumberAvailable == daemonSet.Status.DesiredNumberScheduled && daemonSet.Status.CurrentNumberScheduled == daemonSet.Status.UpdatedNumberScheduled { klog.V(2).InfoS("DaemonSet is available", "daemonSet", klog.KObj(curObj)) return manifestAvailableAction, nil } klog.V(2).InfoS("Still need to wait for daemonSet to be available", "daemonSet", klog.KObj(curObj)) return manifestNotAvailableYetAction, nil } func trackServiceAvailability(curObj *unstructured.Unstructured) (ApplyAction, error) { var service v1.Service if err := runtime.DefaultUnstructuredConverter.FromUnstructured(curObj.Object, &service); err != nil { return errorApplyAction, controller.NewUnexpectedBehaviorError(err) } switch service.Spec.Type { case "": fallthrough //default service type is ClusterIP case v1.ServiceTypeClusterIP: fallthrough case v1.ServiceTypeNodePort: // we regard a ClusterIP or NodePort service as available if it has at least one IP if len(service.Spec.ClusterIPs) > 0 && len(service.Spec.ClusterIPs[0]) > 0 { klog.V(2).InfoS("Service is available", "service", klog.KObj(curObj), "serviceType", service.Spec.Type) return manifestAvailableAction, nil } klog.V(2).InfoS("Still need to wait for a service to be available", "service", klog.KObj(curObj), "serviceType", service.Spec.Type) return manifestNotAvailableYetAction, nil case v1.ServiceTypeLoadBalancer: // we regard a loadBalancer service as available if it has at least one IP or hostname if len(service.Status.LoadBalancer.Ingress) > 0 && (len(service.Status.LoadBalancer.Ingress[0].IP) > 0 || len(service.Status.LoadBalancer.Ingress[0].Hostname) > 0) { klog.V(2).InfoS("LoadBalancer service is available", "service", klog.KObj(curObj)) return manifestAvailableAction, nil } klog.V(2).InfoS("Still need to wait for loadBalancer service to be available", "service", klog.KObj(curObj)) return manifestNotAvailableYetAction, nil } // we don't know how to track the availability of when the service type is externalName klog.V(2).InfoS("Checking the availability of service is not supported", "service", klog.KObj(curObj), "type", service.Spec.Type) return manifestNotTrackableAction, nil } // isDataResource checks if the resource is a data resource which means it is available immediately after creation. func isDataResource(gvr schema.GroupVersionResource) bool { switch gvr { case utils.NamespaceGVR: return true case utils.SecretGVR: return true case utils.ConfigMapGVR: return true case utils.RoleGVR: return true case utils.ClusterRoleGVR: return true case utils.RoleBindingGVR: return true case utils.ClusterRoleBindingGVR: return true } return false } // constructWorkCondition constructs the work condition based on the apply result // TODO: special handle no results // TODO: special handle the apply error of type "report" drift. func constructWorkCondition(results []applyResult, work *fleetv1beta1.Work) []error { var errs []error // Update manifestCondition based on the results. manifestConditions := make([]fleetv1beta1.ManifestCondition, len(results)) for index, result := range results { if result.applyErr != nil { errs = append(errs, result.applyErr) } newConditions := buildManifestCondition(result.applyErr, result.action, result.generation) manifestCondition := fleetv1beta1.ManifestCondition{ Identifier: result.identifier, } existingManifestCondition := findManifestConditionByIdentifier(result.identifier, work.Status.ManifestConditions) if existingManifestCondition != nil { manifestCondition.Conditions = existingManifestCondition.Conditions } // merge the status of the manifest condition for _, condition := range newConditions { meta.SetStatusCondition(&manifestCondition.Conditions, condition) } manifestConditions[index] = manifestCondition } work.Status.ManifestConditions = manifestConditions // merge the status of the work condition newWorkConditions := buildWorkCondition(manifestConditions, work.Generation) for _, condition := range newWorkConditions { meta.SetStatusCondition(&work.Status.Conditions, condition) } return errs } // Join starts to reconcile func (r *ApplyWorkReconciler) Join(_ context.Context) error { if !r.joined.Load() { klog.InfoS("Mark the apply work reconciler joined") } r.joined.Store(true) return nil } // Leave start func (r *ApplyWorkReconciler) Leave(ctx context.Context) error { var works fleetv1beta1.WorkList if r.joined.Load() { klog.InfoS("Mark the apply work reconciler left") } r.joined.Store(false) // list all the work object we created in the member cluster namespace listOpts := []client.ListOption{ client.InNamespace(r.workNameSpace), } if err := r.client.List(ctx, &works, listOpts...); err != nil { klog.ErrorS(err, "Failed to list all the work object", "clusterNS", r.workNameSpace) return client.IgnoreNotFound(err) } // we leave the resources on the member cluster for now for _, work := range works.Items { staleWork := work.DeepCopy() if controllerutil.ContainsFinalizer(staleWork, fleetv1beta1.WorkFinalizer) { controllerutil.RemoveFinalizer(staleWork, fleetv1beta1.WorkFinalizer) if updateErr := r.client.Update(ctx, staleWork, &client.UpdateOptions{}); updateErr != nil { klog.ErrorS(updateErr, "Failed to remove the work finalizer from the work", "clusterNS", r.workNameSpace, "work", klog.KObj(staleWork)) return updateErr } } } klog.V(2).InfoS("Successfully removed all the work finalizers in the cluster namespace", "clusterNS", r.workNameSpace, "number of work", len(works.Items)) return nil } // SetupWithManager wires up the controller. func (r *ApplyWorkReconciler) SetupWithManager(mgr ctrl.Manager) error { r.appliers = map[fleetv1beta1.ApplyStrategyType]Applier{ fleetv1beta1.ApplyStrategyTypeServerSideApply: &ServerSideApplier{ HubClient: r.client, WorkNamespace: r.workNameSpace, SpokeDynamicClient: r.spokeDynamicClient, }, fleetv1beta1.ApplyStrategyTypeClientSideApply: &ClientSideApplier{ HubClient: r.client, WorkNamespace: r.workNameSpace, SpokeDynamicClient: r.spokeDynamicClient, }, } return ctrl.NewControllerManagedBy(mgr).Named("work-controller"). WithOptions(ctrloption.Options{ MaxConcurrentReconciles: r.concurrency, }). For(&fleetv1beta1.Work{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(r) } // Generates a hash of the spec annotation from an unstructured object after we remove all the fields // we have modified. func computeManifestHash(obj *unstructured.Unstructured) (string, error) { manifest := obj.DeepCopy() // remove the last applied Annotation to avoid unlimited recursion annotation := manifest.GetAnnotations() if annotation != nil { delete(annotation, fleetv1beta1.ManifestHashAnnotation) delete(annotation, fleetv1beta1.LastAppliedConfigAnnotation) if len(annotation) == 0 { manifest.SetAnnotations(nil) } else { manifest.SetAnnotations(annotation) } } // strip the live object related fields just in case manifest.SetResourceVersion("") manifest.SetGeneration(0) manifest.SetUID("") manifest.SetSelfLink("") manifest.SetDeletionTimestamp(nil) manifest.SetManagedFields(nil) unstructured.RemoveNestedField(manifest.Object, "metadata", "creationTimestamp") unstructured.RemoveNestedField(manifest.Object, "status") // compute the sha256 hash of the remaining data return resource.HashOf(manifest.Object) } // isManifestManagedByWork determines if an object is managed by the work controller. func isManifestManagedByWork(ownerRefs []metav1.OwnerReference) bool { if len(ownerRefs) == 0 { return false } // an object is NOT managed by the work if any of its owner reference is not of type appliedWork // We'll fail the operation if the resource is owned by other applier (non-fleet agent) and placement does not allow // co-ownership. for _, ownerRef := range ownerRefs { if ownerRef.APIVersion != fleetv1beta1.GroupVersion.String() || ownerRef.Kind != fleetv1beta1.AppliedWorkKind { return false } } return true } // findManifestConditionByIdentifier return a ManifestCondition by identifier // 1. Find the manifest condition with the whole identifier. // 2. If identifier only has ordinal, and a matched cannot be found, return nil. // 3. Try to find properties, other than the ordinal, within the identifier. func findManifestConditionByIdentifier(identifier fleetv1beta1.WorkResourceIdentifier, manifestConditions []fleetv1beta1.ManifestCondition) *fleetv1beta1.ManifestCondition { for _, manifestCondition := range manifestConditions { if identifier == manifestCondition.Identifier { return &manifestCondition } } if identifier == (fleetv1beta1.WorkResourceIdentifier{Ordinal: identifier.Ordinal}) { return nil } identifierCopy := identifier.DeepCopy() for _, manifestCondition := range manifestConditions { identifierCopy.Ordinal = manifestCondition.Identifier.Ordinal if *identifierCopy == manifestCondition.Identifier { return &manifestCondition } } return nil } // setManifestHashAnnotation computes the hash of the provided manifest and sets an annotation of the // hash on the provided unstructured object. func setManifestHashAnnotation(manifestObj *unstructured.Unstructured) error { manifestHash, err := computeManifestHash(manifestObj) if err != nil { return err } annotation := manifestObj.GetAnnotations() if annotation == nil { annotation = map[string]string{} } annotation[fleetv1beta1.ManifestHashAnnotation] = manifestHash manifestObj.SetAnnotations(annotation) return nil } // Builds a resource identifier for a given unstructured.Unstructured object. func buildResourceIdentifier(index int, object *unstructured.Unstructured, gvr schema.GroupVersionResource) fleetv1beta1.WorkResourceIdentifier { return fleetv1beta1.WorkResourceIdentifier{ Ordinal: index, Group: object.GroupVersionKind().Group, Version: object.GroupVersionKind().Version, Kind: object.GroupVersionKind().Kind, Namespace: object.GetNamespace(), Name: object.GetName(), Resource: gvr.Resource, } } func buildManifestCondition(err error, action ApplyAction, observedGeneration int64) []metav1.Condition { applyCondition := metav1.Condition{ Type: fleetv1beta1.WorkConditionTypeApplied, LastTransitionTime: metav1.Now(), ObservedGeneration: observedGeneration, } availableCondition := metav1.Condition{ Type: fleetv1beta1.WorkConditionTypeAvailable, LastTransitionTime: metav1.Now(), ObservedGeneration: observedGeneration, } if err != nil { applyCondition.Status = metav1.ConditionFalse switch action { case applyConflictBetweenPlacements: applyCondition.Reason = ApplyConflictBetweenPlacementsReason case manifestAlreadyOwnedByOthers: applyCondition.Reason = ManifestsAlreadyOwnedByOthersReason default: applyCondition.Reason = ManifestApplyFailedReason } // TODO: handle the max length (32768) of the message field applyCondition.Message = fmt.Sprintf("Failed to apply manifest: %v", err) availableCondition.Status = metav1.ConditionUnknown availableCondition.Reason = ManifestApplyFailedReason availableCondition.Message = "Manifest is not applied yet" } else { applyCondition.Status = metav1.ConditionTrue // the first three actions types means we did write to the cluster thus the availability is unknown // the last three actions types means we start to track the resources switch action { case manifestCreatedAction: applyCondition.Reason = string(manifestCreatedAction) applyCondition.Message = "Manifest is created successfully" availableCondition.Status = metav1.ConditionUnknown availableCondition.Reason = ManifestNeedsUpdateReason availableCondition.Message = manifestNeedsUpdateMessage case manifestThreeWayMergePatchAction: applyCondition.Reason = string(manifestThreeWayMergePatchAction) applyCondition.Message = "Manifest is patched successfully" availableCondition.Status = metav1.ConditionUnknown availableCondition.Reason = ManifestNeedsUpdateReason availableCondition.Message = manifestNeedsUpdateMessage case manifestServerSideAppliedAction: applyCondition.Reason = string(manifestServerSideAppliedAction) applyCondition.Message = "Manifest is patched successfully" availableCondition.Status = metav1.ConditionUnknown availableCondition.Reason = ManifestNeedsUpdateReason availableCondition.Message = manifestNeedsUpdateMessage case manifestAvailableAction: applyCondition.Reason = ManifestAlreadyUpToDateReason applyCondition.Message = manifestAlreadyUpToDateMessage availableCondition.Status = metav1.ConditionTrue availableCondition.Reason = string(manifestAvailableAction) availableCondition.Message = "Manifest is trackable and available now" case manifestNotAvailableYetAction: applyCondition.Reason = ManifestAlreadyUpToDateReason applyCondition.Message = manifestAlreadyUpToDateMessage availableCondition.Status = metav1.ConditionFalse availableCondition.Reason = string(manifestNotAvailableYetAction) availableCondition.Message = "Manifest is trackable but not available yet" // we cannot stuck at unknown so we have to mark it as true case manifestNotTrackableAction: applyCondition.Reason = ManifestAlreadyUpToDateReason applyCondition.Message = manifestAlreadyUpToDateMessage availableCondition.Status = metav1.ConditionTrue availableCondition.Reason = string(manifestNotTrackableAction) availableCondition.Message = "Manifest is not trackable" default: klog.ErrorS(controller.ErrUnexpectedBehavior, "Unknown apply action result", "applyResult", action) } } return []metav1.Condition{applyCondition, availableCondition} } // buildWorkCondition generate overall applied and available status condition for work. // If one of the manifests is applied failed on the member cluster, the applied status condition of the work is false. // If one of the manifests is not available yet on the member cluster, the available status condition of the work is false. // If all the manifests are available, the available status condition of the work is true. // Otherwise, the available status condition of the work is unknown. func buildWorkCondition(manifestConditions []fleetv1beta1.ManifestCondition, observedGeneration int64) []metav1.Condition { applyCondition := metav1.Condition{ Type: fleetv1beta1.WorkConditionTypeApplied, LastTransitionTime: metav1.Now(), ObservedGeneration: observedGeneration, } availableCondition := metav1.Condition{ Type: fleetv1beta1.WorkConditionTypeAvailable, LastTransitionTime: metav1.Now(), ObservedGeneration: observedGeneration, } // the manifest condition should not be an empty list for _, manifestCond := range manifestConditions { if meta.IsStatusConditionFalse(manifestCond.Conditions, fleetv1beta1.WorkConditionTypeApplied) { // we mark the entire work applied condition to false if one of the manifests is applied failed applyCondition.Status = metav1.ConditionFalse applyCondition.Reason = workAppliedFailedReason applyCondition.Message = fmt.Sprintf("Apply manifest %+v failed", manifestCond.Identifier) availableCondition.Status = metav1.ConditionUnknown availableCondition.Reason = workAppliedFailedReason return []metav1.Condition{applyCondition, availableCondition} } } applyCondition.Status = metav1.ConditionTrue applyCondition.Reason = workAppliedCompletedReason applyCondition.Message = "Work is applied successfully" // we mark the entire work available condition to unknown if one of the manifests is not known yet for _, manifestCond := range manifestConditions { cond := meta.FindStatusCondition(manifestCond.Conditions, fleetv1beta1.WorkConditionTypeAvailable) if cond.Status == metav1.ConditionUnknown { availableCondition.Status = metav1.ConditionUnknown availableCondition.Reason = workAvailabilityUnknownReason availableCondition.Message = fmt.Sprintf("Manifest %+v availability is not known yet", manifestCond.Identifier) return []metav1.Condition{applyCondition, availableCondition} } } // now that there is no unknown, we mark the entire work available condition to false if one of the manifests is not applied yet for _, manifestCond := range manifestConditions { cond := meta.FindStatusCondition(manifestCond.Conditions, fleetv1beta1.WorkConditionTypeAvailable) if cond.Status == metav1.ConditionFalse { availableCondition.Status = metav1.ConditionFalse availableCondition.Reason = workNotAvailableYetReason availableCondition.Message = fmt.Sprintf("Manifest %+v is not available yet", manifestCond.Identifier) return []metav1.Condition{applyCondition, availableCondition} } } // now that all the conditions are true, we mark the entire work available condition reason to be not trackable if one of the manifests is not trackable trackable := true for _, manifestCond := range manifestConditions { cond := meta.FindStatusCondition(manifestCond.Conditions, fleetv1beta1.WorkConditionTypeAvailable) if cond.Reason == string(manifestNotTrackableAction) { trackable = false break } } availableCondition.Status = metav1.ConditionTrue if trackable { availableCondition.Reason = WorkAvailableReason availableCondition.Message = "Work is available now" } else { availableCondition.Reason = WorkNotTrackableReason availableCondition.Message = "Work's availability is not trackable" } return []metav1.Condition{applyCondition, availableCondition} }