pkg/trimaran/resourcestats.go (180 lines of code) (raw):
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package trimaran
import (
"math"
"github.com/paypal/load-watcher/pkg/watcher"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
const (
// MegaFactor : Mega unit multiplier
MegaFactor = float64(1. / 1024. / 1024.)
)
// ResourceStats : statistics data for a resource
type ResourceStats struct {
// average used (absolute)
UsedAvg float64
// standard deviation used (absolute)
UsedStdev float64
// req of pod
Req float64
// node capacity
Capacity float64
}
// CreateResourceStats : get resource statistics data from measurements for a node
func CreateResourceStats(metrics []watcher.Metric, node *v1.Node, podRequest *framework.Resource,
resourceName v1.ResourceName, watcherType string) (rs *ResourceStats, isValid bool) {
// get resource usage statistics
nodeUtil, nodeStd, metricFound := GetResourceData(metrics, watcherType)
if !metricFound {
klog.V(6).InfoS("Resource usage statistics for node : no valid data", "node", klog.KObj(node))
return nil, false
}
// get resource capacity
rs = &ResourceStats{}
allocatableResources := node.Status.Allocatable
am := allocatableResources[resourceName]
if resourceName == v1.ResourceCPU {
rs.Capacity = float64(am.MilliValue())
rs.Req = float64(podRequest.MilliCPU)
} else {
rs.Capacity = float64(am.Value())
rs.Capacity *= MegaFactor
rs.Req = float64(podRequest.Memory) * MegaFactor
}
// calculate absolute usage statistics
rs.UsedAvg = nodeUtil * rs.Capacity / 100
rs.UsedStdev = nodeStd * rs.Capacity / 100
klog.V(6).InfoS("Resource usage statistics for node", "node", klog.KObj(node), "resource", resourceName,
"capacity", rs.Capacity, "required", rs.Req, "usedAvg", rs.UsedAvg, "usedStdev", rs.UsedStdev)
return rs, true
}
// GetMuSigma : get average and standard deviation from statistics
func GetMuSigma(rs *ResourceStats) (float64, float64) {
if rs.Capacity <= 0 {
return 0, 0
}
mu := (rs.UsedAvg + rs.Req) / rs.Capacity
mu = math.Max(math.Min(mu, 1), 0)
sigma := rs.UsedStdev / rs.Capacity
sigma = math.Max(math.Min(sigma, 1), 0)
return mu, sigma
}
// GetResourceData : get data from measurements for a given resource type
func GetResourceData(metrics []watcher.Metric, resourceType string) (avg float64, stDev float64, isValid bool) {
// for backward compatibility of LoadWatcher:
// average data metric without operator specified
avgFound := false
for _, metric := range metrics {
if metric.Type == resourceType {
if metric.Operator == watcher.Average {
avg = metric.Value
avgFound = true
} else if metric.Operator == watcher.Std {
stDev = metric.Value
} else if (metric.Operator == "" || metric.Operator == watcher.Latest) && !avgFound {
avg = metric.Value
}
isValid = true
}
}
return avg, stDev, isValid
}
// GetResourceRequested : calculate the resource requests of a pod (CPU and Memory)
func GetResourceRequested(pod *v1.Pod) *framework.Resource {
return GetEffectiveResource(pod, func(container *v1.Container) v1.ResourceList {
return container.Resources.Requests
})
}
// GetResourceLimits : calculate the resource limits of a pod (CPU and Memory)
func GetResourceLimits(pod *v1.Pod) *framework.Resource {
return GetEffectiveResource(pod, func(container *v1.Container) v1.ResourceList {
return container.Resources.Limits
})
}
// GetEffectiveResource: calculate effective resources of a pod (CPU and Memory)
func GetEffectiveResource(pod *v1.Pod, fn func(container *v1.Container) v1.ResourceList) *framework.Resource {
result := &framework.Resource{}
// add up resources of all containers
for _, container := range pod.Spec.Containers {
result.Add(fn(&container))
}
// take max(sum_pod, any_init_container)
for _, container := range pod.Spec.InitContainers {
for rName, rQuantity := range fn(&container) {
switch rName {
case v1.ResourceCPU:
setMax(&result.MilliCPU, rQuantity.MilliValue())
case v1.ResourceMemory:
setMax(&result.Memory, rQuantity.Value())
}
}
}
// add any pod overhead
if pod.Spec.Overhead != nil {
result.Add(pod.Spec.Overhead)
}
return result
}
// NodeRequestsAndLimits : data ralated to requests and limits of resources on a node
type NodeRequestsAndLimits struct {
// NodeRequest sum of requests of all pods on node
NodeRequest *framework.Resource
// NodeLimit sum of limits of all pods on node
NodeLimit *framework.Resource
// NodeRequestMinusPod is the NodeRequest without the requests of the pending pod
NodeRequestMinusPod *framework.Resource
// NodeLimitMinusPod is the NodeLimit without the limits of the pending pod
NodeLimitMinusPod *framework.Resource
// Nodecapacity is the capacity (allocatable) of node
Nodecapacity *framework.Resource
}
// GetNodeRequestsAndLimits : total requested and limits of resources on a given node plus a pod
func GetNodeRequestsAndLimits(podInfosOnNode []*framework.PodInfo, node *v1.Node, pod *v1.Pod,
podRequests *framework.Resource, podLimits *framework.Resource) *NodeRequestsAndLimits {
// initialization
nodeRequest := &framework.Resource{}
nodeLimit := &framework.Resource{}
nodeRequestMinusPod := &framework.Resource{}
nodeLimitMinusPod := &framework.Resource{}
// set capacities
nodeCapacity := &framework.Resource{}
allocatableResources := node.Status.Allocatable
amCpu := allocatableResources[v1.ResourceCPU]
capCpu := amCpu.MilliValue()
amMem := allocatableResources[v1.ResourceMemory]
capMem := amMem.Value()
nodeCapacity.MilliCPU = capCpu
nodeCapacity.Memory = capMem
// get requests and limits for all pods
podsOnNode := make([]*v1.Pod, len(podInfosOnNode))
for i, pf := range podInfosOnNode {
podsOnNode[i] = pf.Pod
}
for _, p := range append(podsOnNode, pod) {
var requested *framework.Resource
var limits *framework.Resource
// pending pod is last in sequence
if p == pod {
*nodeRequestMinusPod = *nodeRequest
*nodeLimitMinusPod = *nodeLimit
requested = podRequests
limits = podLimits
} else {
// get requests and limits for pod
requested = GetResourceRequested(p)
limits = GetResourceLimits(p)
// make sure limits not less than requests
SetMaxLimits(requested, limits)
}
// accumulate
nodeRequest.MilliCPU += requested.MilliCPU
nodeRequest.Memory += requested.Memory
nodeLimit.MilliCPU += limits.MilliCPU
nodeLimit.Memory += limits.Memory
}
// cap requests by node capacity
setMin(&nodeRequest.MilliCPU, capCpu)
setMin(&nodeRequest.Memory, capMem)
setMin(&nodeRequestMinusPod.MilliCPU, capCpu)
setMin(&nodeRequestMinusPod.Memory, capMem)
klog.V(6).InfoS("Total node resources:", "node", klog.KObj(node),
"CPU-req", nodeRequest.MilliCPU, "Memory-req", nodeRequest.Memory,
"CPU-limit", nodeLimit.MilliCPU, "Memory-limit", nodeLimit.Memory,
"CPU-cap", nodeCapacity.MilliCPU, "Memory-cap", nodeCapacity.Memory)
return &NodeRequestsAndLimits{
NodeRequest: nodeRequest,
NodeLimit: nodeLimit,
NodeRequestMinusPod: nodeRequestMinusPod,
NodeLimitMinusPod: nodeLimitMinusPod,
Nodecapacity: nodeCapacity,
}
}
// SetMaxLimits : set limits to max(limits, requests)
// (Note: we could have used '(r *Resource) SetMaxResource(rl v1.ResourceList)', but takes map as arg )
func SetMaxLimits(requests *framework.Resource, limits *framework.Resource) {
setMax(&limits.MilliCPU, requests.MilliCPU)
setMax(&limits.Memory, requests.Memory)
setMax(&limits.EphemeralStorage, requests.EphemeralStorage)
if limits.AllowedPodNumber < requests.AllowedPodNumber {
limits.AllowedPodNumber = requests.AllowedPodNumber
}
for k, v := range requests.ScalarResources {
if limits.ScalarResources[k] < v {
limits.ScalarResources[k] = v
}
}
}
// setMin : x <- min(x, y)
func setMin(x *int64, y int64) {
if *x > y {
*x = y
}
}
// setMax : x <- max(x, y)
func setMax(x *int64, y int64) {
if *x < y {
*x = y
}
}