pkg/trimaran/lowriskovercommitment/lowriskovercommitment.go (200 lines of code) (raw):
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package loadvariationriskbalancing plugin attempts to balance the risk in load variation
// across the cluster. Risk is expressed as the sum of average and standard deviation of
// the measured load on a node. Typical load balancing involves only the average load.
// Here, we consider the variation in load as well, hence resulting in a safer balance.
package lowriskovercommitment
import (
"context"
"fmt"
"math"
"github.com/paypal/load-watcher/pkg/watcher"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
pluginConfig "sigs.k8s.io/scheduler-plugins/apis/config"
pluginv1 "sigs.k8s.io/scheduler-plugins/apis/config/v1"
"sigs.k8s.io/scheduler-plugins/pkg/trimaran"
)
const (
// Name : name of plugin
Name = "LowRiskOverCommitment"
// MaxVarianceAllowance : allowed value from the maximum variance (to avoid zero divisions)
MaxVarianceAllowance = 0.99
// State key used in CycleState
PodResourcesKey = Name + ".PodResources"
)
// LowRiskOverCommitment : scheduler plugin
type LowRiskOverCommitment struct {
handle framework.Handle
collector *trimaran.Collector
args *pluginConfig.LowRiskOverCommitmentArgs
riskLimitWeightsMap map[v1.ResourceName]float64
}
// New : create an instance of a LowRiskOverCommitment plugin
func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
klog.V(4).InfoS("Creating new instance of the LowRiskOverCommitment plugin")
// cast object into plugin arguments object
args, ok := obj.(*pluginConfig.LowRiskOverCommitmentArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type LowRiskOverCommitmentArgs, got %T", obj)
}
collector, err := trimaran.NewCollector(&args.TrimaranSpec)
if err != nil {
return nil, err
}
// create map of resource risk limit weights
m := make(map[v1.ResourceName]float64)
m[v1.ResourceCPU] = pluginv1.DefaultRiskLimitWeight
m[v1.ResourceMemory] = pluginv1.DefaultRiskLimitWeight
for r, w := range args.RiskLimitWeights {
m[r] = w
}
klog.V(4).InfoS("Using LowRiskOverCommitmentArgs", "smoothingWindowSize", args.SmoothingWindowSize,
"riskLimitWeights", m)
pl := &LowRiskOverCommitment{
handle: handle,
collector: collector,
args: args,
riskLimitWeightsMap: m,
}
return pl, nil
}
// PreScore : calculate pod requests and limits and store as plugin state data to be used during scoring
func (pl *LowRiskOverCommitment) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
klog.V(6).InfoS("PreScore: Calculating pod resource requests and limits", "pod", klog.KObj(pod))
podResourcesStateData := CreatePodResourcesStateData(pod)
cycleState.Write(PodResourcesKey, podResourcesStateData)
return nil
}
// Score : evaluate score for a node
func (pl *LowRiskOverCommitment) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
klog.V(6).InfoS("Score: Calculating score", "pod", klog.KObj(pod), "nodeName", nodeName)
score := framework.MinNodeScore
defer func() {
klog.V(6).InfoS("Calculating totalScore", "pod", klog.KObj(pod), "nodeName", nodeName, "totalScore", score)
}()
// get pod requests and limits
podResources, err := getPreScoreState(cycleState)
if err != nil {
// calculate pod requests and limits, if missing
klog.V(6).InfoS(err.Error()+"; recalculating", "pod", klog.KObj(pod))
podResources = CreatePodResourcesStateData(pod)
}
// exclude scoring for best effort pods; this plugin is not concerned about best effort pods
podRequests := &podResources.podRequests
podLimits := &podResources.podLimits
if podRequests.MilliCPU == 0 && podRequests.Memory == 0 &&
podLimits.MilliCPU == 0 && podLimits.Memory == 0 {
klog.V(6).InfoS("Skipping scoring best effort pod; using minimum score", "nodeName", nodeName, "pod", klog.KObj(pod))
return score, nil
}
// get node info
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return score, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
// get node metrics
metrics, _ := pl.collector.GetNodeMetrics(nodeName)
if metrics == nil {
klog.InfoS("Failed to get metrics for node; using minimum score", "nodeName", nodeName)
return score, nil
}
// calculate score
totalScore := pl.computeRank(metrics, nodeInfo, pod, podRequests, podLimits) * float64(framework.MaxNodeScore)
score = int64(math.Round(totalScore))
return score, framework.NewStatus(framework.Success, "")
}
// Name : name of plugin
func (pl *LowRiskOverCommitment) Name() string {
return Name
}
// ScoreExtensions : an interface for Score extended functionality
func (pl *LowRiskOverCommitment) ScoreExtensions() framework.ScoreExtensions {
return pl
}
// NormalizeScore : normalize scores
func (pl *LowRiskOverCommitment) NormalizeScore(context.Context, *framework.CycleState, *v1.Pod, framework.NodeScoreList) *framework.Status {
return nil
}
// computeRank : rank function for the LowRiskOverCommitment
func (pl *LowRiskOverCommitment) computeRank(metrics []watcher.Metric, nodeInfo *framework.NodeInfo, pod *v1.Pod,
podRequests *framework.Resource, podLimits *framework.Resource) float64 {
node := nodeInfo.Node()
// calculate risk based on requests and limits
nodeRequestsAndLimits := trimaran.GetNodeRequestsAndLimits(nodeInfo.Pods, node, pod, podRequests, podLimits)
riskCPU := pl.computeRisk(metrics, v1.ResourceCPU, watcher.CPU, node, nodeRequestsAndLimits)
riskMemory := pl.computeRisk(metrics, v1.ResourceMemory, watcher.Memory, node, nodeRequestsAndLimits)
rank := 1 - math.Max(riskCPU, riskMemory)
klog.V(6).InfoS("Node rank", "nodeName", node.GetName(), "riskCPU", riskCPU, "riskMemory", riskMemory, "rank", rank)
return rank
}
// computeRisk : calculate the risk of scheduling on node for a given resource
func (pl *LowRiskOverCommitment) computeRisk(metrics []watcher.Metric, resourceName v1.ResourceName,
resourceType string, node *v1.Node, nodeRequestsAndLimits *trimaran.NodeRequestsAndLimits) float64 {
var riskLimit, riskLoad, totalRisk float64
defer func() {
klog.V(6).InfoS("Calculated risk", "node", klog.KObj(node), "resource", resourceName,
"riskLimit", riskLimit, "riskLoad", riskLoad, "totalRisk", totalRisk)
}()
nodeRequest := nodeRequestsAndLimits.NodeRequest
nodeLimit := nodeRequestsAndLimits.NodeLimit
nodeRequestMinusPod := nodeRequestsAndLimits.NodeRequestMinusPod
nodeLimitMinusPod := nodeRequestsAndLimits.NodeLimitMinusPod
nodeCapacity := nodeRequestsAndLimits.Nodecapacity
var request, limit, capacity, requestMinusPod, limitMinusPod int64
if resourceName == v1.ResourceCPU {
request = nodeRequest.MilliCPU
limit = nodeLimit.MilliCPU
requestMinusPod = nodeRequestMinusPod.MilliCPU
limitMinusPod = nodeLimitMinusPod.MilliCPU
capacity = nodeCapacity.MilliCPU
} else if resourceName == v1.ResourceMemory {
request = nodeRequest.Memory
limit = nodeLimit.Memory
requestMinusPod = nodeRequestMinusPod.Memory
limitMinusPod = nodeLimitMinusPod.Memory
capacity = nodeCapacity.Memory
} else {
// invalid resource
klog.V(6).InfoS("Unexpected resource", "resourceName", resourceName)
return 0
}
// (1) riskLimit : calculate overcommit potential load
if limit > capacity {
riskLimit = float64(limit-capacity) / float64(limit-request)
}
klog.V(6).InfoS("RiskLimit", "node", klog.KObj(node), "resource", resourceName, "riskLimit", riskLimit)
// (2) riskLoad : calculate measured overcommitment
zeroRequest := &framework.Resource{}
stats, ok := trimaran.CreateResourceStats(metrics, node, zeroRequest, resourceName, resourceType)
if ok {
// fit a beta distribution to the measured load stats
mu, sigma := trimaran.GetMuSigma(stats)
// adjust standard deviation due to data smoothing
sigma *= math.Pow(float64(pl.args.SmoothingWindowSize), 0.5)
// limit the standard deviation close to the allowed maximum for the beta distribution
sigma = math.Min(sigma, math.Sqrt(GetMaxVariance(mu)*MaxVarianceAllowance))
// calculate area under beta probability curve beyond total allocated, as overuse risk measure
allocThreshold := float64(requestMinusPod) / float64(capacity)
allocThreshold = math.Min(math.Max(allocThreshold, 0), 1)
allocProb, fitDistribution := ComputeProbability(mu, sigma, allocThreshold)
if fitDistribution != nil {
klog.V(6).InfoS("FitDistribution", "node", klog.KObj(node), "resource", resourceName, "dist", fitDistribution.Print())
}
// condition the probability in case total limit is less than capacity
if limitMinusPod < capacity && requestMinusPod <= limitMinusPod {
limitThreshold := float64(limitMinusPod) / float64(capacity)
if limitThreshold == 0 {
allocProb = 1 // zero over zero
} else if fitDistribution != nil {
limitProb := fitDistribution.DistributionFunction(limitThreshold)
if limitProb > 0 {
allocProb /= limitProb
allocProb = math.Min(math.Max(allocProb, 0), 1)
}
}
}
// calculate risk
riskLoad = 1 - allocProb
klog.V(6).InfoS("RiskLoad", "node", klog.KObj(node), "resource", resourceName,
"allocThreshold", allocThreshold, "allocProb", allocProb, "riskLoad", riskLoad)
}
// combine two components of risk into a total risk as a weighted sum
w := pl.riskLimitWeightsMap[resourceName]
totalRisk = w*riskLimit + (1-w)*riskLoad
totalRisk = math.Min(math.Max(totalRisk, 0), 1)
return totalRisk
}
// CreatePodResourcesStateData : calculate pod resource requests and limits and store as plugin state data
func CreatePodResourcesStateData(pod *v1.Pod) *PodResourcesStateData {
requests := trimaran.GetResourceRequested(pod)
limits := trimaran.GetResourceLimits(pod)
// make sure limits not less than requests
trimaran.SetMaxLimits(requests, limits)
return &PodResourcesStateData{
podRequests: *requests,
podLimits: *limits,
}
}
// PodResourcesStateData : computed at PreScore and used at Score
type PodResourcesStateData struct {
podRequests framework.Resource
podLimits framework.Resource
}
// Clone : clone the pod resource state data
func (s *PodResourcesStateData) Clone() framework.StateData {
return s
}
// getPreScoreState: retrieve pod requests and limits from plugin state data
func getPreScoreState(cycleState *framework.CycleState) (*PodResourcesStateData, error) {
podResourcesStateData, err := cycleState.Read(PodResourcesKey)
if err != nil {
return nil, fmt.Errorf("reading %q from cycleState: %w", PodResourcesKey, err)
}
podResources, ok := podResourcesStateData.(*PodResourcesStateData)
if !ok {
return nil, fmt.Errorf("invalid PreScore state, got type %T", podResourcesStateData)
}
return podResources, nil
}