pkg/controllers/clusterresourceplacement/work_propagation.go (208 lines of code) (raw):

/* Copyright 2025 The KubeFleet Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package clusterresourceplacement import ( "context" "fmt" "reflect" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" workv1alpha1controller "go.goms.io/fleet/pkg/controllers/workv1alpha1" "go.goms.io/fleet/pkg/utils" "go.goms.io/fleet/pkg/utils/resource" ) const ( specHashAnnotationKey = "work.fleet.azure.com/spec-hash-value" ) // scheduleWork creates or updates the work object to reflect the new placement decision. func (r *Reconciler) scheduleWork(ctx context.Context, placement *fleetv1alpha1.ClusterResourcePlacement, manifests []workv1alpha1.Manifest) error { var allErr []error memberClusterNames := placement.Status.TargetClusters workName := placement.Name workerOwnerRef := metav1.OwnerReference{ APIVersion: placement.GroupVersionKind().GroupVersion().String(), Kind: placement.GroupVersionKind().Kind, Name: placement.GetName(), UID: placement.GetUID(), BlockOwnerDeletion: ptr.To(true), Controller: ptr.To(true), } workerSpec := workv1alpha1.WorkSpec{ Workload: workv1alpha1.WorkloadTemplate{ Manifests: manifests, }, } specHash, err := resource.HashOf(workerSpec.Workload) if err != nil { return fmt.Errorf("failed to calculate the spec hash of the newly generated work resource: %w", err) } workLabels := map[string]string{ utils.LabelWorkPlacementName: placement.GetName(), utils.LabelFleetObj: utils.LabelFleetObjValue, } workAnnotation := map[string]string{ utils.LastWorkUpdateTimeAnnotationKey: time.Now().Format(time.RFC3339), specHashAnnotationKey: specHash, } changed := false for _, memberClusterName := range memberClusterNames { memberClusterNsName := fmt.Sprintf(utils.NamespaceNameFormat, memberClusterName) curWork, err := r.getResourceBinding(memberClusterNsName, workName) if err != nil { if !apierrors.IsNotFound(err) { allErr = append(allErr, fmt.Errorf("failed to get the work obj %s in namespace %s: %w", workName, memberClusterName, err)) continue } // create the work CR since it doesn't exist workCR := &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ Namespace: memberClusterNsName, Name: workName, OwnerReferences: []metav1.OwnerReference{ workerOwnerRef, }, Labels: workLabels, Annotations: workAnnotation, }, Spec: workerSpec, } if createErr := r.Client.Create(ctx, workCR, client.FieldOwner(utils.PlacementFieldManagerName)); createErr != nil { klog.ErrorS(createErr, "failed to create the work", "work", workName, "namespace", memberClusterNsName) allErr = append(allErr, fmt.Errorf("failed to create the work obj %s in namespace %s: %w", workName, memberClusterNsName, createErr)) continue } klog.V(2).InfoS("created work spec with manifests", "member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests)) changed = true continue } existingHash := curWork.GetAnnotations()[specHashAnnotationKey] if existingHash == specHash || reflect.DeepEqual(curWork.Spec.Workload.Manifests, workerSpec.Workload.Manifests) { klog.V(2).InfoS("skip updating work spec as its identical", "member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests)) continue } changed = true curWork.Spec = workerSpec curWork.SetLabels(workLabels) curWork.SetOwnerReferences([]metav1.OwnerReference{workerOwnerRef}) curWork.SetAnnotations(workAnnotation) if updateErr := r.Client.Update(ctx, curWork, client.FieldOwner(utils.PlacementFieldManagerName)); updateErr != nil { allErr = append(allErr, fmt.Errorf("failed to update the work obj %s in namespace %s: %w", workName, memberClusterNsName, updateErr)) continue } klog.V(2).InfoS("updated work spec with manifests", "member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests)) } if changed { klog.V(2).InfoS("Applied all work to the selected cluster namespaces", "placement", klog.KObj(placement), "number of clusters", len(memberClusterNames)) } else { klog.V(2).InfoS("Nothing new to apply for the cluster resource placement", "placement", klog.KObj(placement), "number of clusters", len(memberClusterNames)) } return errors.NewAggregate(allErr) } // removeStaleWorks removes all the work objects from the clusters that are no longer selected. func (r *Reconciler) removeStaleWorks(ctx context.Context, placementName string, existingClusters, newClusters []string) (int, error) { var allErr []error workName := placementName clusterMap := make(map[string]bool) for _, cluster := range newClusters { clusterMap[cluster] = true } removed := 0 for _, oldCluster := range existingClusters { if !clusterMap[oldCluster] { memberClusterNsName := fmt.Sprintf(utils.NamespaceNameFormat, oldCluster) workCR := &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ Namespace: memberClusterNsName, Name: workName, }, } if deleteErr := r.Client.Delete(ctx, workCR); deleteErr != nil && !apierrors.IsNotFound(deleteErr) { allErr = append(allErr, fmt.Errorf("failed to delete the work obj %s from namespace %s: %w", workName, memberClusterNsName, deleteErr)) continue } removed++ klog.V(2).InfoS("deleted a work resource from clusters no longer selected", "member cluster namespace", memberClusterNsName, "work name", workName, "place", placementName) } } return removed, errors.NewAggregate(allErr) } // collectAllManifestsStatus goes through all the manifest this placement handles and return if there is either // still pending manifests or error func (r *Reconciler) collectAllManifestsStatus(placement *fleetv1alpha1.ClusterResourcePlacement) (bool, error) { hasPending := false placement.Status.FailedResourcePlacements = make([]fleetv1alpha1.FailedResourcePlacement, 0) workName := placement.GetName() for _, cluster := range placement.Status.TargetClusters { memberClusterNsName := fmt.Sprintf(utils.NamespaceNameFormat, cluster) work, err := r.getResourceBinding(memberClusterNsName, workName) if err != nil { if apierrors.IsNotFound(err) { klog.V(2).InfoS("the work change has not shown up in the cache yet", "work", klog.KRef(memberClusterNsName, workName), "cluster", cluster) hasPending = true continue } return false, fmt.Errorf("failed to get the work obj %s from namespace %s: %w", workName, memberClusterNsName, err) } // check the overall condition appliedCond := meta.FindStatusCondition(work.Status.Conditions, workv1alpha1controller.ConditionTypeApplied) if appliedCond == nil { hasPending = true klog.V(2).InfoS("the work is never picked up by the member cluster", "work", klog.KObj(work), "cluster", cluster) continue } if appliedCond.ObservedGeneration < work.GetGeneration() { hasPending = true klog.V(2).InfoS("the update of the work is not picked up by the member cluster yet", "work", klog.KObj(work), "cluster", cluster, "work generation", work.GetGeneration(), "applied generation", appliedCond.ObservedGeneration) continue } if appliedCond.Status == metav1.ConditionTrue { klog.V(2).InfoS("the work is applied successfully by the member cluster", "work", klog.KObj(work), "cluster", cluster) continue } for _, manifestCondition := range work.Status.ManifestConditions { resourceIdentifier := fleetv1alpha1.ResourceIdentifier{ Group: manifestCondition.Identifier.Group, Version: manifestCondition.Identifier.Version, Kind: manifestCondition.Identifier.Kind, Name: manifestCondition.Identifier.Name, Namespace: manifestCondition.Identifier.Namespace, } appliedCond = meta.FindStatusCondition(manifestCondition.Conditions, workv1alpha1controller.ConditionTypeApplied) // collect if there is an explicit fail if appliedCond != nil && appliedCond.Status != metav1.ConditionTrue { klog.V(2).InfoS("find a failed to apply manifest", "member cluster namespace", memberClusterNsName, "manifest name", manifestCondition.Identifier.Name, "group", manifestCondition.Identifier.Group, "version", manifestCondition.Identifier.Version, "kind", manifestCondition.Identifier.Kind) placement.Status.FailedResourcePlacements = append(placement.Status.FailedResourcePlacements, fleetv1alpha1.FailedResourcePlacement{ ResourceIdentifier: resourceIdentifier, Condition: *appliedCond, ClusterName: cluster, }) } } } return hasPending, nil } // getResourceBinding retrieves a work object by its name and namespace, this will hit the informer cache. func (r *Reconciler) getResourceBinding(namespace, name string) (*workv1alpha1.Work, error) { obj, err := r.InformerManager.Lister(utils.WorkV1Alpha1GVR).ByNamespace(namespace).Get(name) if err != nil { return nil, err } var workObj workv1alpha1.Work err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.DeepCopyObject().(*unstructured.Unstructured).Object, &workObj) if err != nil { return nil, err } return &workObj, nil }