pkg/common/resource.go (229 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common
import (
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"github.com/apache/yunikorn-k8shim/pkg/log"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
// resource builder is a helper struct to construct si resources
type ResourceBuilder struct {
resourceMap map[string]*si.Quantity
}
func NewResourceBuilder() *ResourceBuilder {
return &ResourceBuilder{
resourceMap: make(map[string]*si.Quantity),
}
}
func (w *ResourceBuilder) AddResource(name string, value int64) *ResourceBuilder {
w.resourceMap[name] = &si.Quantity{Value: value}
return w
}
func (w *ResourceBuilder) Build() *si.Resource {
return &si.Resource{Resources: w.resourceMap}
}
// GetPodResource from a pod's containers and convert that into an internal resource.
// Scheduling only accounts for requests.
// Convert the pod into a resource to allow for pod count checks in quotas and nodes.
func GetPodResource(pod *v1.Pod) (resource *si.Resource) {
podResource := &si.Resource{
Resources: map[string]*si.Quantity{"pods": {Value: 1}},
}
// A QosBestEffort pod does not request any resources, just a single pod
if qos.GetPodQOS(pod) == v1.PodQOSBestEffort {
return podResource
}
for _, c := range pod.Spec.Containers {
resourceList := c.Resources.Requests
containerResource := getResource(resourceList)
podResource = Add(podResource, containerResource)
}
// each resource compare between initcontainer and sum of containers
// max(sum(Containers requirement), InitContainers requirement)
if pod.Spec.InitContainers != nil {
checkInitContainerRequest(pod, podResource)
}
// K8s pod EnableOverHead from:
// alpha: v1.16
// beta: v1.18
// Enables PodOverhead, for accounting pod overheads which are specific to a given RuntimeClass
// If Overhead is being utilized, add to the total requests for the pod
if pod.Spec.Overhead != nil {
podOverHeadResource := getResource(pod.Spec.Overhead)
podResource = Add(podResource, podOverHeadResource)
// Logging the overall pod size and pod overhead
log.Log(log.ShimResources).Debug("Pod overhead specified, overall pod size adjusted",
zap.String("taskID", string(pod.UID)),
zap.Stringer("overallSize", podResource),
zap.Stringer("overheadSize", podOverHeadResource))
}
return podResource
}
func checkInitContainerRequest(pod *v1.Pod, containersResources *si.Resource) {
for _, c := range pod.Spec.InitContainers {
resourceList := c.Resources.Requests
ICResource := getResource(resourceList)
for resourceName, ICRequest := range ICResource.Resources {
containersRequests, exist := containersResources.Resources[resourceName]
// addtional resource request from init cont, add it to request.
if !exist {
containersResources.Resources[resourceName] = ICRequest
continue
}
if ICRequest.GetValue() > containersRequests.GetValue() {
containersResources.Resources[resourceName] = ICRequest
}
}
}
}
func GetNodeResource(nodeStatus *v1.NodeStatus) *si.Resource {
// Capacity represents the total capacity of the node
// Allocatable represents the resources of a node that are available for scheduling.
// Each kubelet can reserve some resources from the scheduler.
// We can rely on Allocatable resource here, because if it is not specified,
// the default value is same as Capacity. (same behavior as the default-scheduler)
return getResource(nodeStatus.Allocatable)
}
// parse cpu and memory from string to si.Resource, both of them are optional
// if parse failed with some errors, log the error and return a nil
func ParseResource(cpuStr, memStr string) *si.Resource {
if cpuStr == "" && memStr == "" {
return nil
}
result := NewResourceBuilder()
if cpuStr != "" {
if vcore, err := resource.ParseQuantity(cpuStr); err == nil {
result.AddResource(siCommon.CPU, vcore.MilliValue())
} else {
log.Log(log.ShimResources).Error("failed to parse cpu resource",
zap.String("cpuStr", cpuStr),
zap.Error(err))
return nil
}
}
if memStr != "" {
if mem, err := resource.ParseQuantity(memStr); err == nil {
result.AddResource(siCommon.Memory, mem.Value())
} else {
log.Log(log.ShimResources).Error("failed to parse memory resource",
zap.String("memStr", memStr),
zap.Error(err))
return nil
}
}
return result.Build()
}
func GetResource(resMap map[string]string) *si.Resource {
result := NewResourceBuilder()
for resName, resValue := range resMap {
switch resName {
case v1.ResourceCPU.String():
if actualValue, err := resource.ParseQuantity(resValue); err == nil {
result.AddResource(siCommon.CPU, actualValue.MilliValue())
} else {
log.Log(log.ShimResources).Error("failed to parse cpu resource",
zap.String("res name", "cpu"),
zap.String("res value", resValue),
zap.Error(err))
return nil
}
default:
if actualValue, err := resource.ParseQuantity(resValue); err == nil {
result.AddResource(resName, actualValue.Value())
} else {
log.Log(log.ShimResources).Error("failed to parse resource",
zap.String("res name", resName),
zap.String("res value", resValue),
zap.Error(err))
return nil
}
}
}
return result.Build()
}
func GetTGResource(resMap map[string]resource.Quantity, members int64) *si.Resource {
result := NewResourceBuilder()
result.AddResource("pods", members)
for resName, resValue := range resMap {
switch resName {
case v1.ResourceCPU.String():
result.AddResource(siCommon.CPU, members*resValue.MilliValue())
default:
result.AddResource(resName, members*resValue.Value())
}
}
return result.Build()
}
func getResource(resourceList v1.ResourceList) *si.Resource {
resources := NewResourceBuilder()
for name, value := range resourceList {
switch name {
case v1.ResourceCPU:
vcore := value.MilliValue()
resources.AddResource(siCommon.CPU, vcore)
default:
resources.AddResource(string(name), value.Value())
}
}
return resources.Build()
}
func Equals(left *si.Resource, right *si.Resource) bool {
if left == right {
return true
}
if left != nil && left.Resources != nil {
for k, v := range left.Resources {
if right == nil ||
right.Resources[k] == nil ||
right.Resources[k].Value != v.Value {
return false
}
}
}
if right != nil && right.Resources != nil {
for k, v := range right.Resources {
if left == nil ||
left.Resources[k] == nil ||
left.Resources[k].Value != v.Value {
return false
}
}
}
return true
}
func Add(left *si.Resource, right *si.Resource) *si.Resource {
result := &si.Resource{Resources: make(map[string]*si.Quantity)}
if left == nil && right == nil {
return result
}
if right != nil {
for k, v := range right.Resources {
result.Resources[k] = v
}
}
if left != nil {
for k, v := range left.Resources {
if er, ok := result.Resources[k]; ok {
result.Resources[k] = &si.Quantity{Value: int64(er.Value + v.Value)}
continue
}
result.Resources[k] = v
}
}
return result
}
func Sub(left *si.Resource, right *si.Resource) *si.Resource {
if left == nil {
left = &si.Resource{}
}
if right == nil {
return left
}
// clone left
rb := NewResourceBuilder()
for k, v := range left.Resources {
rb.AddResource(k, v.Value)
}
result := rb.Build()
// sub right
for k, v := range right.Resources {
if _, ok := result.Resources[k]; !ok {
result.Resources[k] = &si.Quantity{
Value: -v.Value,
}
} else {
result.Resources[k].Value -= v.Value
}
}
return result
}
func IsZero(r *si.Resource) bool {
if r == nil {
return true
}
for _, v := range r.Resources {
if v.Value != 0 {
return false
}
}
return true
}