pkg/networkaware/networkoverhead/networkoverhead.go (474 lines of code) (raw):

/* Copyright 2022 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package networkoverhead import ( "context" "fmt" "math" "sort" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "sigs.k8s.io/controller-runtime/pkg/client" pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config" networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util" agv1alpha1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1" ntv1alpha1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1" ) var _ framework.PreFilterPlugin = &NetworkOverhead{} var _ framework.FilterPlugin = &NetworkOverhead{} var _ framework.ScorePlugin = &NetworkOverhead{} const ( // Name : name of plugin used in the plugin registry and configurations. Name = "NetworkOverhead" // MaxCost : MaxCost used in the NetworkTopology for costs between origins and destinations MaxCost = 100 // SameHostname : If pods belong to the same host, then consider cost as 0 SameHostname = 0 // SameZone : If pods belong to hosts in the same zone, then consider cost as 1 SameZone = 1 // preFilterStateKey is the key in CycleState to NetworkOverhead pre-computed data. preFilterStateKey = "PreFilter" + Name ) var scheme = runtime.NewScheme() func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(agv1alpha1.AddToScheme(scheme)) utilruntime.Must(ntv1alpha1.AddToScheme(scheme)) } // NetworkOverhead : Filter and Score nodes based on Pod's AppGroup requirements: MaxNetworkCosts requirements among Pods with dependencies type NetworkOverhead struct { client.Client podLister corelisters.PodLister handle framework.Handle namespaces []string weightsName string ntName string } // PreFilterState computed at PreFilter and used at Filter and Score. type PreFilterState struct { // boolean that tells the filter and scoring functions to pass the pod since it does not belong to an AppGroup scoreEqually bool // AppGroup name of the pod agName string // AppGroup CR appGroup *agv1alpha1.AppGroup // NetworkTopology CR networkTopology *ntv1alpha1.NetworkTopology // Dependency List of the given pod dependencyList []agv1alpha1.DependenciesInfo // Pods already scheduled based on the dependency list scheduledList networkawareutil.ScheduledList // node map for cost / destinations. Search for requirements faster... nodeCostMap map[string]map[networkawareutil.CostKey]int64 // node map for satisfied dependencies satisfiedMap map[string]int64 // node map for violated dependencies violatedMap map[string]int64 // node map for costs finalCostMap map[string]int64 } // Clone the preFilter state. func (no *PreFilterState) Clone() framework.StateData { return no } // Name : returns name of the plugin. func (no *NetworkOverhead) Name() string { return Name } func getArgs(obj runtime.Object) (*pluginconfig.NetworkOverheadArgs, error) { NetworkOverheadArgs, ok := obj.(*pluginconfig.NetworkOverheadArgs) if !ok { return nil, fmt.Errorf("want args to be of type NetworkOverhead, got %T", obj) } return NetworkOverheadArgs, nil } // ScoreExtensions : an interface for Score extended functionality func (no *NetworkOverhead) ScoreExtensions() framework.ScoreExtensions { return no } // New : create an instance of a NetworkOverhead plugin func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { klog.V(4).InfoS("Creating new instance of the NetworkOverhead plugin") args, err := getArgs(obj) if err != nil { return nil, err } client, err := client.New(handle.KubeConfig(), client.Options{ Scheme: scheme, }) if err != nil { return nil, err } no := &NetworkOverhead{ Client: client, podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(), handle: handle, namespaces: args.Namespaces, weightsName: args.WeightsName, ntName: args.NetworkTopologyName, } return no, nil } // PreFilter performs the following operations: // 1. Get appGroup name and respective appGroup CR. // 2. Get networkTopology CR. // 3. Get dependency and scheduled list for the given pod // 4. Update cost map of all nodes // 5. Get number of satisfied and violated dependencies // 6. Get final cost of the given node to be used in the score plugin func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) (*framework.PreFilterResult, *framework.Status) { // Init PreFilter State preFilterState := &PreFilterState{ scoreEqually: true, } // Write initial status state.Write(preFilterStateKey, preFilterState) // Check if Pod belongs to an AppGroup agName := networkawareutil.GetPodAppGroupLabel(pod) if len(agName) == 0 { // Return return nil, framework.NewStatus(framework.Success, "Pod does not belong to an AppGroup, return") } // Get AppGroup CR appGroup := no.findAppGroupNetworkOverhead(agName) // Get NetworkTopology CR networkTopology := no.findNetworkTopologyNetworkOverhead() // Sort Costs if manual weights were selected no.sortNetworkTopologyCosts(networkTopology) // Get Dependencies of the given pod dependencyList := networkawareutil.GetDependencyList(pod, appGroup) // If the pod has no dependencies, return if dependencyList == nil { return nil, framework.NewStatus(framework.Success, "Pod has no dependencies, return") } // Get pods from lister selector := labels.Set(map[string]string{agv1alpha1.AppGroupLabel: agName}).AsSelector() pods, err := no.podLister.List(selector) if err != nil { return nil, framework.NewStatus(framework.Success, "Error while returning pods from appGroup, return") } // Return if pods are not yet allocated for the AppGroup... if len(pods) == 0 { return nil, framework.NewStatus(framework.Success, "No pods yet allocated, return") } // Pods already scheduled: Get Scheduled List (Deployment name, replicaID, hostname) scheduledList := networkawareutil.GetScheduledList(pods) // Check if scheduledList is empty... if len(scheduledList) == 0 { klog.ErrorS(nil, "Scheduled list is empty, return") return nil, framework.NewStatus(framework.Success, "Scheduled list is empty, return") } // Get all nodes nodeList, err := no.handle.SnapshotSharedLister().NodeInfos().List() if err != nil { return nil, framework.NewStatus(framework.Error, fmt.Sprintf("Error getting the nodelist: %v", err)) } // Create variables to fill PreFilterState nodeCostMap := make(map[string]map[networkawareutil.CostKey]int64) satisfiedMap := make(map[string]int64) violatedMap := make(map[string]int64) finalCostMap := make(map[string]int64) // For each node: // 1 - Get region and zone labels // 2 - Calculate satisfied and violated number of dependencies // 3 - Calculate the final cost of the node to be used by the scoring plugin for _, nodeInfo := range nodeList { // retrieve region and zone labels region := networkawareutil.GetNodeRegion(nodeInfo.Node()) zone := networkawareutil.GetNodeZone(nodeInfo.Node()) klog.V(6).InfoS("Node info", "name", nodeInfo.Node().Name, "region", region, "zone", zone) // Create map for cost / destinations. Search for requirements faster... costMap := make(map[networkawareutil.CostKey]int64) // Populate cost map for the given node no.populateCostMap(costMap, networkTopology, region, zone) klog.V(6).InfoS("Map", "costMap", costMap) // Update nodeCostMap nodeCostMap[nodeInfo.Node().Name] = costMap // Get Satisfied and Violated number of dependencies satisfied, violated, ok := checkMaxNetworkCostRequirements(scheduledList, dependencyList, nodeInfo, region, zone, costMap, no) if ok != nil { return nil, framework.NewStatus(framework.Error, fmt.Sprintf("pod hostname not found: %v", ok)) } // Update Satisfied and Violated maps satisfiedMap[nodeInfo.Node().Name] = satisfied violatedMap[nodeInfo.Node().Name] = violated klog.V(6).InfoS("Number of dependencies", "satisfied", satisfied, "violated", violated) // Get accumulated cost based on pod dependencies cost, ok := no.getAccumulatedCost(scheduledList, dependencyList, nodeInfo.Node().Name, region, zone, costMap) if ok != nil { return nil, framework.NewStatus(framework.Error, fmt.Sprintf("getting pod hostname from Snapshot: %v", ok)) } klog.V(6).InfoS("Node final cost", "cost", cost) finalCostMap[nodeInfo.Node().Name] = cost } // Update PreFilter State preFilterState = &PreFilterState{ scoreEqually: false, agName: agName, appGroup: appGroup, networkTopology: networkTopology, dependencyList: dependencyList, scheduledList: scheduledList, nodeCostMap: nodeCostMap, satisfiedMap: satisfiedMap, violatedMap: violatedMap, finalCostMap: finalCostMap, } state.Write(preFilterStateKey, preFilterState) return nil, framework.NewStatus(framework.Success, "PreFilter State updated") } // PreFilterExtensions returns prefilter extensions, pod add and remove. func (no *NetworkOverhead) PreFilterExtensions() framework.PreFilterExtensions { return no } // AddPod from pre-computed data in cycleState. // no current need for the NetworkOverhead plugin func (no *NetworkOverhead) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { return framework.NewStatus(framework.Success, "") } // RemovePod from pre-computed data in cycleState. // no current need for the NetworkOverhead plugin func (no *NetworkOverhead) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { return framework.NewStatus(framework.Success, "") } // Filter : evaluate if node can respect maxNetworkCost requirements func (no *NetworkOverhead) Filter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { if nodeInfo.Node() == nil { return framework.NewStatus(framework.Error, "node not found") } // Get PreFilterState preFilterState, err := getPreFilterState(cycleState) if err != nil { klog.ErrorS(err, "Failed to read preFilterState from cycleState", "preFilterStateKey", preFilterStateKey) return framework.NewStatus(framework.Error, "not eligible due to failed to read from cycleState") } // If scoreEqually, return nil if preFilterState.scoreEqually { klog.V(6).InfoS("Score all nodes equally, return") return nil } // Get satisfied and violated number of dependencies satisfied := preFilterState.satisfiedMap[nodeInfo.Node().Name] violated := preFilterState.violatedMap[nodeInfo.Node().Name] klog.V(6).InfoS("Number of dependencies:", "satisfied", satisfied, "violated", violated) // The pod is filtered out if the number of violated dependencies is higher than the satisfied ones if violated > satisfied { return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Node %v does not meet several network requirements from Workload dependencies: Satisfied: %v Violated: %v", nodeInfo.Node().Name, satisfied, violated)) } return nil } // Score : evaluate score for a node func (no *NetworkOverhead) Score(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) (int64, *framework.Status) { score := framework.MinNodeScore // Get PreFilterState preFilterState, err := getPreFilterState(cycleState) if err != nil { klog.ErrorS(err, "Failed to read preFilterState from cycleState", "preFilterStateKey", preFilterStateKey) return score, framework.NewStatus(framework.Error, "not eligible due to failed to read from cycleState, return min score") } // If scoreEqually, return minScore if preFilterState.scoreEqually { return score, framework.NewStatus(framework.Success, "scoreEqually enabled: minimum score") } // Return Accumulated Cost as score score = preFilterState.finalCostMap[nodeName] klog.V(4).InfoS("Score:", "pod", pod.GetName(), "node", nodeName, "finalScore", score) return score, framework.NewStatus(framework.Success, "Accumulated cost added as score, normalization ensures lower costs are favored") } // NormalizeScore : normalize scores since lower scores correspond to lower latency func (no *NetworkOverhead) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, scores framework.NodeScoreList) *framework.Status { klog.V(4).InfoS("before normalization: ", "scores", scores) // Get Min and Max Scores to normalize between framework.MaxNodeScore and framework.MinNodeScore minCost, maxCost := getMinMaxScores(scores) // If all nodes were given the minimum score, return if minCost == 0 && maxCost == 0 { return nil } var normCost float64 for i := range scores { if maxCost != minCost { // If max != min // node_normalized_cost = MAX_SCORE * ( ( nodeScore - minCost) / (maxCost - minCost) // nodeScore = MAX_SCORE - node_normalized_cost normCost = float64(framework.MaxNodeScore) * float64(scores[i].Score-minCost) / float64(maxCost-minCost) scores[i].Score = framework.MaxNodeScore - int64(normCost) } else { // If maxCost = minCost, avoid division by 0 normCost = float64(scores[i].Score - minCost) scores[i].Score = framework.MaxNodeScore - int64(normCost) } } klog.V(4).InfoS("after normalization: ", "scores", scores) return nil } // MinMax : get min and max scores from NodeScoreList func getMinMaxScores(scores framework.NodeScoreList) (int64, int64) { var max int64 = math.MinInt64 // Set to min value var min int64 = math.MaxInt64 // Set to max value for _, nodeScore := range scores { if nodeScore.Score > max { max = nodeScore.Score } if nodeScore.Score < min { min = nodeScore.Score } } // return min and max scores return min, max } // sortNetworkTopologyCosts : sort costs if manual weights were selected func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *ntv1alpha1.NetworkTopology) { if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts { // Manual weights were selected for _, w := range networkTopology.Spec.Weights { // Sort Costs by TopologyKey, might not be sorted since were manually defined sort.Sort(networkawareutil.ByTopologyKey(w.TopologyList)) } } } // populateCostMap : Populates costMap based on the node being filtered/scored func (no *NetworkOverhead) populateCostMap( costMap map[networkawareutil.CostKey]int64, networkTopology *ntv1alpha1.NetworkTopology, region string, zone string) { for _, w := range networkTopology.Spec.Weights { // Check the weights List if w.Name != no.weightsName { // If it is not the Preferred algorithm, continue continue } if region != "" { // Add Region Costs // Binary search through CostList: find the Topology Key for region topologyList := networkawareutil.FindTopologyKey(w.TopologyList, ntv1alpha1.NetworkTopologyRegion) if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts { // Sort Costs by origin, might not be sorted since were manually defined sort.Sort(networkawareutil.ByOrigin(topologyList)) } // Binary search through TopologyList: find the costs for the given Region costs := networkawareutil.FindOriginCosts(topologyList, region) // Add Region Costs for _, c := range costs { costMap[networkawareutil.CostKey{ // Add the cost to the map Origin: region, Destination: c.Destination}] = c.NetworkCost } } if zone != "" { // Add Zone Costs // Binary search through CostList: find the Topology Key for zone topologyList := networkawareutil.FindTopologyKey(w.TopologyList, ntv1alpha1.NetworkTopologyZone) if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts { // Sort Costs by origin, might not be sorted since were manually defined sort.Sort(networkawareutil.ByOrigin(topologyList)) } // Binary search through TopologyList: find the costs for the given Region costs := networkawareutil.FindOriginCosts(topologyList, zone) // Add Zone Costs for _, c := range costs { costMap[networkawareutil.CostKey{ // Add the cost to the map Origin: zone, Destination: c.Destination}] = c.NetworkCost } } } } // checkMaxNetworkCostRequirements : verifies the number of met and unmet dependencies based on the pod being filtered func checkMaxNetworkCostRequirements( scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeInfo *framework.NodeInfo, region string, zone string, costMap map[networkawareutil.CostKey]int64, no *NetworkOverhead) (int64, int64, error) { var satisfied int64 = 0 var violated int64 = 0 // check if maxNetworkCost fits for _, podAllocated := range scheduledList { // For each pod already allocated if podAllocated.Hostname != "" { // if hostname not empty... for _, d := range dependencyList { // For each pod dependency // If the pod allocated is not an established dependency, continue. if podAllocated.Selector != d.Workload.Selector { continue } // If the Pod hostname is the node being filtered, requirements are checked via extended resources if podAllocated.Hostname == nodeInfo.Node().Name { satisfied += 1 continue } // If Nodes are not the same, get NodeInfo from pod Hostname podNodeInfo, err := no.handle.SnapshotSharedLister().NodeInfos().Get(podAllocated.Hostname) if err != nil { klog.ErrorS(nil, "getting pod nodeInfo %q from Snapshot: %v", podNodeInfo, err) return satisfied, violated, err } // Get zone and region from Pod Hostname regionPodNodeInfo := networkawareutil.GetNodeRegion(podNodeInfo.Node()) zonePodNodeInfo := networkawareutil.GetNodeZone(podNodeInfo.Node()) if regionPodNodeInfo == "" && zonePodNodeInfo == "" { // Node has no zone and region defined violated += 1 } else if region == regionPodNodeInfo { // If Nodes belong to the same region if zone == zonePodNodeInfo { // If Nodes belong to the same zone satisfied += 1 } else { // belong to a different zone, check maxNetworkCost cost, costOK := costMap[networkawareutil.CostKey{ // Retrieve the cost from the map (origin: zone, destination: pod zoneHostname) Origin: zone, // Time Complexity: O(1) Destination: zonePodNodeInfo, }] if costOK { if cost <= d.MaxNetworkCost { satisfied += 1 } else { violated += 1 } } } } else { // belong to a different region cost, costOK := costMap[networkawareutil.CostKey{ // Retrieve the cost from the map (origin: zone, destination: pod zoneHostname) Origin: region, // Time Complexity: O(1) Destination: regionPodNodeInfo, }] if costOK { if cost <= d.MaxNetworkCost { satisfied += 1 } else { violated += 1 } } } } } } return satisfied, violated, nil } // getAccumulatedCost : calculate the accumulated cost based on the Pod's dependencies func (no *NetworkOverhead) getAccumulatedCost( scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeName string, region string, zone string, costMap map[networkawareutil.CostKey]int64) (int64, error) { // keep track of the accumulated cost var cost int64 = 0 // calculate accumulated shortest path for _, podAllocated := range scheduledList { // For each pod already allocated for _, d := range dependencyList { // For each pod dependency // If the pod allocated is not an established dependency, continue. if podAllocated.Selector != d.Workload.Selector { continue } if podAllocated.Hostname == nodeName { // If the Pod hostname is the node being scored cost += SameHostname } else { // If Nodes are not the same // Get NodeInfo from pod Hostname podNodeInfo, err := no.handle.SnapshotSharedLister().NodeInfos().Get(podAllocated.Hostname) if err != nil { klog.ErrorS(nil, "getting pod hostname %q from Snapshot: %v", podNodeInfo, err) return cost, err } // Get zone and region from Pod Hostname regionPodNodeInfo := networkawareutil.GetNodeRegion(podNodeInfo.Node()) zonePodNodeInfo := networkawareutil.GetNodeZone(podNodeInfo.Node()) if regionPodNodeInfo == "" && zonePodNodeInfo == "" { // Node has no zone and region defined cost += MaxCost } else if region == regionPodNodeInfo { // If Nodes belong to the same region if zone == zonePodNodeInfo { // If Nodes belong to the same zone cost += SameZone } else { // belong to a different zone value, ok := costMap[networkawareutil.CostKey{ // Retrieve the cost from the map (origin: zone, destination: pod zoneHostname) Origin: zone, // Time Complexity: O(1) Destination: zonePodNodeInfo, }] if ok { cost += value // Add the cost to the sum } else { cost += MaxCost } } } else { // belong to a different region value, ok := costMap[networkawareutil.CostKey{ // Retrieve the cost from the map (origin: region, destination: pod regionHostname) Origin: region, // Time Complexity: O(1) Destination: regionPodNodeInfo, }] if ok { cost += value // Add the cost to the sum } else { cost += MaxCost } } } } } return cost, nil } func getPreFilterState(cycleState *framework.CycleState) (*PreFilterState, error) { no, err := cycleState.Read(preFilterStateKey) if err != nil { // preFilterState doesn't exist, likely PreFilter wasn't invoked. return nil, fmt.Errorf("error reading %q from cycleState: %w", preFilterStateKey, err) } state, ok := no.(*PreFilterState) if !ok { return nil, fmt.Errorf("%+v convert to NetworkOverhead.preFilterState error", no) } return state, nil } func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *agv1alpha1.AppGroup { klog.V(6).InfoS("namespaces: %s", no.namespaces) for _, namespace := range no.namespaces { klog.V(6).InfoS("appGroup CR", "namespace", namespace, "name", agName) // AppGroup could not be placed in several namespaces simultaneously appGroup := &agv1alpha1.AppGroup{} err := no.Get(context.TODO(), client.ObjectKey{ Namespace: namespace, Name: agName, }, appGroup) if err != nil { klog.V(4).ErrorS(err, "Cannot get AppGroup from AppGroupNamespaceLister:") continue } if appGroup != nil && appGroup.GetUID() != "" { return appGroup } } return nil } func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead() *ntv1alpha1.NetworkTopology { klog.V(6).InfoS("namespaces: %s", no.namespaces) for _, namespace := range no.namespaces { klog.V(6).InfoS("networkTopology CR:", "namespace", namespace, "name", no.ntName) // NetworkTopology could not be placed in several namespaces simultaneously networkTopology := &ntv1alpha1.NetworkTopology{} err := no.Get(context.TODO(), client.ObjectKey{ Namespace: namespace, Name: no.ntName, }, networkTopology) if err != nil { klog.V(4).ErrorS(err, "Cannot get networkTopology from networkTopologyNamespaceLister:") continue } if networkTopology != nil && networkTopology.GetUID() != "" { return networkTopology } } return nil }