pkg/noderesourcetopology/filter.go (164 lines of code) (raw):

/* Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package noderesourcetopology import ( "context" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog/v2" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" bm "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/go-logr/logr" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/logging" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" "sigs.k8s.io/scheduler-plugins/pkg/util" ) // The maximum number of NUMA nodes that Topology Manager allows is 8 // https://kubernetes.io/docs/tasks/administer-cluster/topology-manager/#known-limitations const highestNUMAID = 8 type PolicyHandler func(pod *v1.Pod, zoneMap topologyv1alpha2.ZoneList) *framework.Status func singleNUMAContainerLevelHandler(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, nodeInfo *framework.NodeInfo) *framework.Status { lh.V(5).Info("container level single NUMA node handler") // prepare NUMANodes list from zoneMap nodes := createNUMANodeList(lh, zones) qos := v1qos.GetPodQOS(pod) // Node() != nil already verified in Filter(), which is the only public entry point logNumaNodes(lh, "container handler NUMA resources", nodeInfo.Node().Name, nodes) // the init containers are running SERIALLY and BEFORE the normal containers. // https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#understanding-init-containers // therefore, we don't need to accumulate their resources together for _, initContainer := range pod.Spec.InitContainers { lh.V(6).Info("init container desired resources", stringify.ResourceListToLoggable(initContainer.Resources.Requests)...) _, match := resourcesAvailableInAnyNUMANodes(lh, nodes, initContainer.Resources.Requests, qos, nodeInfo) if !match { // we can't align init container, so definitely we can't align a pod lh.V(2).Info("cannot align container", "name", initContainer.Name, "kind", "init") return framework.NewStatus(framework.Unschedulable, "cannot align init container") } } for _, container := range pod.Spec.Containers { // TODO: add containerName lh.V(6).Info("app container resources", stringify.ResourceListToLoggable(container.Resources.Requests)...) numaID, match := resourcesAvailableInAnyNUMANodes(lh, nodes, container.Resources.Requests, qos, nodeInfo) if !match { // we can't align container, so definitely we can't align a pod lh.V(2).Info("cannot align container", "name", container.Name, "kind", "app") return framework.NewStatus(framework.Unschedulable, "cannot align container") } // subtract the resources requested by the container from the given NUMA. // this is necessary, so we won't allocate the same resources for the upcoming containers subtractFromNUMA(lh, nodes, numaID, container) } lh.V(2).Info("can align all containers") return nil } // resourcesAvailableInAnyNUMANodes checks for sufficient resource and return the NUMAID that would be selected by Kubelet. // this function requires NUMANodeList with properly populated NUMANode, NUMAID should be in range 0-63 func resourcesAvailableInAnyNUMANodes(lh logr.Logger, numaNodes NUMANodeList, resources v1.ResourceList, qos v1.PodQOSClass, nodeInfo *framework.NodeInfo) (int, bool) { numaID := highestNUMAID bitmask := bm.NewEmptyBitMask() // set all bits, each bit is a NUMA node, if resources couldn't be aligned // on the NUMA node, bit should be unset bitmask.Fill() nodeResources := util.ResourceList(nodeInfo.Allocatable) for resource, quantity := range resources { if quantity.IsZero() { // why bother? everything's fine from the perspective of this resource lh.V(4).Info("ignoring zero-qty resource request", "resource", resource) continue } if _, ok := nodeResources[resource]; !ok { // some resources may not expose NUMA affinity (device plugins, extended resources), but all resources // must be reported at node level; thus, if they are not present at node level, we can safely assume // we don't have the resource at all. lh.V(2).Info("early verdict: cannot meet request", "resource", resource, "suitable", "false") return numaID, false } // for each requested resource, calculate which NUMA slots are good fits, and then AND with the aggregated bitmask, IOW unset appropriate bit if we can't align resources, or set it // obvious, bits which are not in the NUMA id's range would be unset hasNUMAAffinity := false resourceBitmask := bm.NewEmptyBitMask() for _, numaNode := range numaNodes { numaQuantity, ok := numaNode.Resources[resource] if !ok { continue } hasNUMAAffinity = true if !isResourceSetSuitable(qos, resource, quantity, numaQuantity) { continue } resourceBitmask.Add(numaNode.NUMAID) lh.V(6).Info("feasible", "numaCell", numaNode.NUMAID, "resource", resource) } // non-native resources or ephemeral-storage may not expose NUMA affinity, // but since they are available at node level, this is fine if !hasNUMAAffinity && isHostLevelResource(resource) { lh.V(6).Info("resource available at host level (no NUMA affinity)", "resource", resource) continue } bitmask.And(resourceBitmask) if bitmask.IsEmpty() { lh.V(2).Info("early verdict", "resource", resource, "suitable", "false") return numaID, false } } // according to TopologyManager, the preferred NUMA affinity, is the narrowest one. // https://github.com/kubernetes/kubernetes/blob/v1.24.0-rc.1/pkg/kubelet/cm/topologymanager/policy.go#L155 // in single-numa-node policy all resources should be allocated from a single NUMA, // which means that the lowest NUMA ID (with available resources) is the one to be selected by Kubelet. numaID = bitmask.GetBits()[0] // at least one NUMA node is available ret := !bitmask.IsEmpty() lh.V(2).Info("final verdict", "suitable", ret) return numaID, ret } func isResourceSetSuitable(qos v1.PodQOSClass, resource v1.ResourceName, quantity, numaQuantity resource.Quantity) bool { if qos != v1.PodQOSGuaranteed && isNUMAAffineResource(resource) { return true } return numaQuantity.Cmp(quantity) >= 0 } func singleNUMAPodLevelHandler(lh logr.Logger, pod *v1.Pod, zones topologyv1alpha2.ZoneList, nodeInfo *framework.NodeInfo) *framework.Status { lh.V(5).Info("pod level single NUMA node handler") resources := util.GetPodEffectiveRequest(pod) nodes := createNUMANodeList(lh, zones) // Node() != nil already verified in Filter(), which is the only public entry point logNumaNodes(lh, "pod handler NUMA resources", nodeInfo.Node().Name, nodes) lh.V(6).Info("pod desired resources", stringify.ResourceListToLoggable(resources)...) if _, match := resourcesAvailableInAnyNUMANodes(lh, createNUMANodeList(lh, zones), resources, v1qos.GetPodQOS(pod), nodeInfo); !match { lh.V(2).Info("cannot align pod", "name", pod.Name) return framework.NewStatus(framework.Unschedulable, "cannot align pod") } lh.V(2).Info("can align pod") return nil } // Filter Now only single-numa-node supported func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { if nodeInfo.Node() == nil { return framework.NewStatus(framework.Error, "node not found") } if v1qos.GetPodQOS(pod) == v1.PodQOSBestEffort && !resourcerequests.IncludeNonNative(pod) { return nil } nodeName := nodeInfo.Node().Name lh := klog.FromContext(ctx).WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) lh.V(4).Info(logging.FlowBegin) defer lh.V(4).Info(logging.FlowEnd) nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod) if !ok { lh.V(2).Info("invalid topology data") return framework.NewStatus(framework.Unschedulable, "invalid node topology data") } if nodeTopology == nil { return nil } lh.V(4).Info("found nrt data", "object", stringify.NodeResourceTopologyResources(nodeTopology)) handler := filterHandlerFromTopologyManagerConfig(topologyManagerConfigFromNodeResourceTopology(lh, nodeTopology)) if handler == nil { return nil } status := handler(lh, pod, nodeTopology.Zones, nodeInfo) if status != nil { tm.nrtCache.NodeMaybeOverReserved(nodeName, pod) } return status } // subtractFromNUMA finds the correct NUMA ID's resources and subtract them from `nodes`. func subtractFromNUMA(lh logr.Logger, nodes NUMANodeList, numaID int, container v1.Container) { for i := 0; i < len(nodes); i++ { if nodes[i].NUMAID != numaID { continue } nRes := nodes[i].Resources for resName, quan := range container.Resources.Requests { nodeResQuan := nRes[resName] nodeResQuan.Sub(quan) // we do not expect a negative value here, since this function only called // when resourcesAvailableInAnyNUMANodes function is passed // but let's log here if such unlikely case will occur if nodeResQuan.Sign() == -1 { lh.V(4).Info("resource quantity should not be a negative value", "resource", resName, "quantity", nodeResQuan.String()) } nRes[resName] = nodeResQuan } } } func filterHandlerFromTopologyManagerConfig(conf TopologyManagerConfig) filterFn { if conf.Policy != kubeletconfig.SingleNumaNodeTopologyManagerPolicy { return nil } if conf.Scope == kubeletconfig.PodTopologyManagerScope { return singleNUMAPodLevelHandler } if conf.Scope == kubeletconfig.ContainerTopologyManagerScope { return singleNUMAContainerLevelHandler } return nil // cannot happen }