pkg/noderesourcetopology/plugin.go (115 lines of code) (raw):

/* Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package noderesourcetopology import ( "context" "fmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" "sigs.k8s.io/scheduler-plugins/apis/config/validation" nrtcache "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/cache" "github.com/go-logr/logr" topologyapi "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" ) const ( // Name is the name of the plugin used in the plugin registry and configurations. Name = "NodeResourceTopologyMatch" ) var scheme = runtime.NewScheme() func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(topologyv1alpha2.AddToScheme(scheme)) } type NUMANode struct { NUMAID int Resources v1.ResourceList Costs map[int]int } func (n *NUMANode) WithCosts(costs map[int]int) *NUMANode { n.Costs = costs return n } type NUMANodeList []NUMANode func subtractFromNUMAs(resources v1.ResourceList, numaNodes NUMANodeList, nodes ...int) { for resName, quantity := range resources { for _, node := range nodes { // quantity is zero no need to iterate through another NUMA node, go to another resource if quantity.IsZero() { break } nRes := numaNodes[node].Resources if available, ok := nRes[resName]; ok { switch quantity.Cmp(available) { case 0: // the same // basically zero container resources quantity.Sub(available) // zero NUMA quantity nRes[resName] = resource.Quantity{} case 1: // container wants more resources than available in this NUMA zone // substract NUMA resources from container request, to calculate how much is missing quantity.Sub(available) // zero NUMA quantity nRes[resName] = resource.Quantity{} case -1: // there are more resources available in this NUMA zone than container requests // substract container resources from resources available in this NUMA node available.Sub(quantity) // zero container quantity quantity = resource.Quantity{} nRes[resName] = available } } } } } type filterFn func(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, nodeInfo *framework.NodeInfo) *framework.Status type scoringFn func(logr.Logger, *v1.Pod, topologyv1alpha2.ZoneList) (int64, *framework.Status) // TopologyMatch plugin which run simplified version of TopologyManager's admit handler type TopologyMatch struct { resourceToWeightMap resourceToWeightMap nrtCache nrtcache.Interface scoreStrategyFunc scoreStrategyFn scoreStrategyType apiconfig.ScoringStrategyType } var _ framework.FilterPlugin = &TopologyMatch{} var _ framework.ReservePlugin = &TopologyMatch{} var _ framework.ScorePlugin = &TopologyMatch{} var _ framework.EnqueueExtensions = &TopologyMatch{} var _ framework.PostBindPlugin = &TopologyMatch{} // Name returns name of the plugin. It is used in logs, etc. func (tm *TopologyMatch) Name() string { return Name } // New initializes a new plugin and returns it. func New(ctx context.Context, args runtime.Object, handle framework.Handle) (framework.Plugin, error) { lh := klog.FromContext(ctx) lh.V(5).Info("creating new noderesourcetopology plugin") tcfg, ok := args.(*apiconfig.NodeResourceTopologyMatchArgs) if !ok { return nil, fmt.Errorf("want args to be of type NodeResourceTopologyMatchArgs, got %T", args) } if err := validation.ValidateNodeResourceTopologyMatchArgs(nil, tcfg); err != nil { return nil, err } nrtCache, err := initNodeTopologyInformer(ctx, lh, tcfg, handle) if err != nil { lh.Error(err, "cannot create clientset for NodeTopologyResource", "kubeConfig", handle.KubeConfig()) return nil, err } resToWeightMap := make(resourceToWeightMap) for _, resource := range tcfg.ScoringStrategy.Resources { resToWeightMap[v1.ResourceName(resource.Name)] = resource.Weight } // This is not strictly needed, but we do it here and we carry `scoreStrategyFunc` around // to be able to do as much parameter validation as possible here in this function. // We perform only the NRT-object-specific validation in `Filter()` and `Score()` // because we can't help it, being the earliest point in time on which we have access // to NRT instances. strategy, err := getScoringStrategyFunction(tcfg.ScoringStrategy.Type) if err != nil { return nil, err } topologyMatch := &TopologyMatch{ resourceToWeightMap: resToWeightMap, nrtCache: nrtCache, scoreStrategyFunc: strategy, scoreStrategyType: tcfg.ScoringStrategy.Type, } return topologyMatch, nil } // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. // NOTE: if in-place-update (KEP 1287) gets implemented, then PodUpdate event // should be registered for this plugin since a Pod update may free up resources // that make other Pods schedulable. func (tm *TopologyMatch) EventsToRegister() []framework.ClusterEventWithHint { // To register a custom event, follow the naming convention at: // https://git.k8s.io/kubernetes/pkg/scheduler/eventhandlers.go#L403-L410 nrtGVK := fmt.Sprintf("noderesourcetopologies.v1alpha2.%v", topologyapi.GroupName) return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeAllocatable}}, {Event: framework.ClusterEvent{Resource: framework.GVK(nrtGVK), ActionType: framework.Add | framework.Update}}, } }