pkg/controllers/clusterresourceplacement/cluster_selector.go (122 lines of code) (raw):

/* Copyright 2025 The KubeFleet Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package clusterresourceplacement import ( "fmt" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" "go.goms.io/fleet/pkg/utils" ) // selectClusters selected the resources according to the placement resourceSelectors and // update the results in its status func (r *Reconciler) selectClusters(placement *fleetv1alpha1.ClusterResourcePlacement) (clusterNames []string, err error) { defer func() { if err == nil { // Set the status placement.Status.TargetClusters = clusterNames } }() // no policy set if placement.Spec.Policy == nil { clusterNames, err = r.listClusters(labels.Everything()) if err != nil { return nil, err } klog.V(2).InfoS("we select all the available clusters in the fleet without a policy", "placement", placement.Name, "clusters", clusterNames) return clusterNames, nil } // a fix list of clusters set if len(placement.Spec.Policy.ClusterNames) != 0 { klog.V(2).InfoS("use the cluster names provided as the list of cluster we select", "placement", placement.Name, "clusters", placement.Spec.Policy.ClusterNames) clusterNames, err = r.getClusters(placement.Spec.Policy.ClusterNames) if err != nil { return nil, err } return clusterNames, nil } // no Affinity or ClusterAffinity set if placement.Spec.Policy.Affinity == nil || placement.Spec.Policy.Affinity.ClusterAffinity == nil { clusterNames, err = r.listClusters(labels.Everything()) if err != nil { return nil, err } klog.V(2).InfoS("we select all the available clusters in the fleet without a cluster affinity", "placement", placement.Name, "clusters", clusterNames) return clusterNames, nil } selectedClusters := make(map[string]bool) for _, clusterSelector := range placement.Spec.Policy.Affinity.ClusterAffinity.ClusterSelectorTerms { selector, err := metav1.LabelSelectorAsSelector(&clusterSelector.LabelSelector) if err != nil { return nil, fmt.Errorf("cannot convert the label clusterSelector to a clusterSelector: %w", err) } matchClusters, err := r.listClusters(selector) if err != nil { return nil, fmt.Errorf("selector = %v: %w", clusterSelector.LabelSelector, err) } klog.V(2).InfoS("selector matches some cluster", "clusterNum", len(matchClusters), "placement", placement.Name, "selector", clusterSelector.LabelSelector) for _, clusterName := range matchClusters { selectedClusters[clusterName] = true } } for cluster := range selectedClusters { klog.V(2).InfoS("matched a cluster", "cluster", cluster, "placement", placement.Name) clusterNames = append(clusterNames, cluster) } return clusterNames, nil } // listClusters retrieves the clusters according to its label selector, this will hit the informer cache. func (r *Reconciler) listClusters(labelSelector labels.Selector) ([]string, error) { objs, err := r.InformerManager.Lister(utils.MCV1Alpha1GVR).List(labelSelector) if err != nil { return nil, fmt.Errorf("failed to list the clusters according to obj label selector: %w", err) } clusterNames := make([]string, 0) for _, obj := range objs { clusterObj, err := convertObjToMemberCluster(obj) if err != nil { return nil, err } // only schedule the resource to an eligible cluster if isClusterEligible(clusterObj) { clusterNames = append(clusterNames, clusterObj.GetName()) } } return clusterNames, nil } // getClusters retrieves the given clusters from the informer cache, and selects the ones found and eligible. func (r *Reconciler) getClusters(clusterNames []string) ([]string, error) { selectedClusters := make([]string, 0) for _, clusterName := range clusterNames { obj, err := r.InformerManager.Lister(utils.MCV1Alpha1GVR).Get(clusterName) if err != nil { klog.ErrorS(err, "cannot get the cluster", "clusterName", clusterName) if !apierrors.IsNotFound(err) { return nil, err } continue } clusterObj, err := convertObjToMemberCluster(obj) if err != nil { return nil, err } // only schedule the resource to an eligible cluster if isClusterEligible(clusterObj) { selectedClusters = append(selectedClusters, clusterObj.GetName()) } } return selectedClusters, nil } func convertObjToMemberCluster(obj runtime.Object) (*fleetv1alpha1.MemberCluster, error) { uObj := obj.DeepCopyObject().(*unstructured.Unstructured) var clusterObj fleetv1alpha1.MemberCluster if err := runtime.DefaultUnstructuredConverter.FromUnstructured(uObj.Object, &clusterObj); err != nil { return nil, fmt.Errorf("cannot decode the member cluster object: %w", err) } return &clusterObj, nil } // isClusterEligible checks whether a member cluster is eligible to be selected in CRP. // a cluster is eligible if its workAgent has a joined condition that is not marked explicitly as Left. // This is the most rudimentary filter for a cluster based on its status. // We will add mechanisms to use a chain of filters later. func isClusterEligible(mc *fleetv1alpha1.MemberCluster) bool { // we only care about the work agent's status for workload placement for _, agentStatus := range mc.Status.AgentStatus { if agentStatus.Type == fleetv1alpha1.MemberAgent { joinCond := meta.FindStatusCondition(agentStatus.Conditions, string(fleetv1alpha1.ConditionTypeMemberClusterJoined)) // we do not care if the observed generation of the condition since it will be out of date as soon // as the member cluster spec changes. return joinCond != nil && joinCond.Status != metav1.ConditionFalse } } return false }