pkg/controllers/memberclusterplacement/membercluster_controller.go (106 lines of code) (raw):

/* Copyright 2025 The KubeFleet Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package memberclusterplacement import ( "context" "fmt" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" "go.goms.io/fleet/pkg/utils" "go.goms.io/fleet/pkg/utils/controller" "go.goms.io/fleet/pkg/utils/informer" ) // Reconciler reconciles a MemberCluster object type Reconciler struct { // the informer contains the cache for all the resources we need InformerManager informer.Manager // PlacementController maintains a rate limited queue which used to store // the name of the clusterResourcePlacement and a reconcile function to consume the items in queue. PlacementController controller.Controller } func (r *Reconciler) Reconcile(_ context.Context, key controller.QueueKey) (ctrl.Result, error) { startTime := time.Now() memberClusterName, ok := key.(string) if !ok { err := fmt.Errorf("got a resource key %+v not of type namespaced key", key) klog.ErrorS(err, "we have encountered a fatal error that can't be retried") return ctrl.Result{}, err } // add latency log defer func() { klog.V(2).InfoS("MemberClusterPlacement reconciliation loop ends", "memberCluster", memberClusterName, "latency", time.Since(startTime).Milliseconds()) }() klog.V(2).InfoS("Start to reconcile a member cluster to enqueue placement events", "memberCluster", memberClusterName) mObj, err := r.InformerManager.Lister(utils.MCV1Alpha1GVR).Get(memberClusterName) if err != nil { klog.ErrorS(err, "failed to get the member cluster", "memberCluster", memberClusterName) if !apierrors.IsNotFound(err) { return ctrl.Result{}, err } mObj = nil //guard against unexpected informer lib behavior } crpList, err := r.InformerManager.Lister(utils.ClusterResourcePlacementV1Alpha1GVR).List(labels.Everything()) if err != nil { klog.ErrorS(err, "failed to list all the cluster resource placement", "memberCluster", memberClusterName) return ctrl.Result{}, err } for i, crp := range crpList { uObj := crp.(*unstructured.Unstructured).DeepCopy() var placement fleetv1alpha1.ClusterResourcePlacement err = runtime.DefaultUnstructuredConverter.FromUnstructured(uObj.Object, &placement) if err != nil { klog.ErrorS(err, "failed to convert a cluster resource placement", "memberCluster", memberClusterName, "crp", uObj.GetName()) return ctrl.Result{}, err } if mObj == nil { // This is a corner case that the member cluster is deleted before we handle its status change. We can't use match since we don't have its label. klog.V(2).InfoS("enqueue a placement to reconcile for a deleted member cluster", "memberCluster", memberClusterName, "placement", klog.KObj(&placement)) r.PlacementController.Enqueue(crpList[i]) } else if matchPlacement(&placement, mObj.(*unstructured.Unstructured).DeepCopy()) { klog.V(2).InfoS("enqueue a placement to reconcile", "memberCluster", memberClusterName, "placement", klog.KObj(&placement)) r.PlacementController.Enqueue(crpList[i]) } } return ctrl.Result{}, nil } // matchPlacement check if a crp will or already selected a memberCluster func matchPlacement(placement *fleetv1alpha1.ClusterResourcePlacement, memberCluster *unstructured.Unstructured) bool { placementObj := klog.KObj(placement) // check if the placement already selected the member cluster for _, selectedCluster := range placement.Status.TargetClusters { if selectedCluster == memberCluster.GetName() { return true } } // no policy set if placement.Spec.Policy == nil { klog.V(2).InfoS("find a matching placement with no policy", "memberCluster", memberCluster.GetName(), "placement", placementObj) return true } // a fix list of clusters set, this takes precedence over the affinity if len(placement.Spec.Policy.ClusterNames) != 0 { for _, clusterName := range placement.Spec.Policy.ClusterNames { if clusterName == memberCluster.GetName() { klog.V(2).InfoS("find a matching placement with a list of cluster names", "memberCluster", memberCluster.GetName(), "placement", placementObj) return true } } return false } // no cluster affinity set if placement.Spec.Policy.Affinity == nil || placement.Spec.Policy.Affinity.ClusterAffinity == nil || len(placement.Spec.Policy.Affinity.ClusterAffinity.ClusterSelectorTerms) == 0 { klog.V(2).InfoS("find a matching placement with no cluster affinity", "memberCluster", memberCluster.GetName(), "placement", placementObj) return true } // check if member cluster match any placement's cluster selectors for _, clusterSelector := range placement.Spec.Policy.Affinity.ClusterAffinity.ClusterSelectorTerms { s, err := metav1.LabelSelectorAsSelector(&clusterSelector.LabelSelector) if err != nil { // should not happen after we have webhooks klog.ErrorS(err, "found a mal-formatted placement", "placement", placementObj, "selector", clusterSelector.LabelSelector) continue } if s.Matches(labels.Set(memberCluster.GetLabels())) { klog.V(2).InfoS("find a matching placement with label selector", "memberCluster", memberCluster.GetName(), "placement", placementObj, "selector", clusterSelector.LabelSelector) return true } } return false }