pkg/resmgr/scalar/resources.go (257 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scalar
import (
"fmt"
"math"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/.gen/peloton/private/resmgr"
"github.com/uber/peloton/.gen/peloton/private/resmgrsvc"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/util"
log "github.com/sirupsen/logrus"
)
// AllocationType represents the different allocation dimensions the resource
// pool can track for admission control
type AllocationType int
const (
// NonPreemptibleAllocation tracks allocation for non-preemptible tasks
NonPreemptibleAllocation AllocationType = iota + 1
// PreemptibleAllocation tracks allocation for preemptible tasks
PreemptibleAllocation
// ControllerAllocation tracks allocation for controller tasks
ControllerAllocation
// TotalAllocation tracks the allocation of all tasks(
// including NonPreemptibleAllocation,PreemptibleAllocation and ControllerAllocation)
TotalAllocation
// SlackAllocation track allocation for tasks launched using slack resources.
SlackAllocation
// NonSlackAllocation track allocation for non-revocable tasks.
NonSlackAllocation
)
// Allocation is the container to track allocation across different dimensions
type Allocation struct {
Value map[AllocationType]*Resources
}
// NewAllocation returns a new Allocation
func NewAllocation() *Allocation {
return initializeZeroAlloc()
}
// GetByType returns the allocation by type
func (a *Allocation) GetByType(allocationType AllocationType) *Resources {
return a.Value[allocationType]
}
// Add adds one allocation to another
func (a *Allocation) Add(other *Allocation) *Allocation {
result := initializeZeroAlloc()
for t, v := range a.Value {
result.Value[t] = v.Add(other.Value[t])
}
return result
}
// Subtract subtracts one allocation to another
func (a *Allocation) Subtract(other *Allocation) *Allocation {
result := initializeZeroAlloc()
for t, v := range a.Value {
result.Value[t] = v.Subtract(other.Value[t])
}
return result
}
// initializeZeroAlloc initializes a zero alloc
func initializeZeroAlloc() *Allocation {
alloc := &Allocation{
Value: make(map[AllocationType]*Resources),
}
alloc.Value[TotalAllocation] = ZeroResource
alloc.Value[NonPreemptibleAllocation] = ZeroResource
alloc.Value[ControllerAllocation] = ZeroResource
alloc.Value[PreemptibleAllocation] = ZeroResource
alloc.Value[SlackAllocation] = ZeroResource
alloc.Value[NonSlackAllocation] = ZeroResource
return alloc
}
// GetGangAllocation returns the allocation across different dimensions of
// all tasks in a gang
func GetGangAllocation(gang *resmgrsvc.Gang) *Allocation {
gangAllocation := initializeZeroAlloc()
for _, t := range gang.GetTasks() {
gangAllocation = gangAllocation.Add(GetTaskAllocation(t))
}
return gangAllocation
}
// GetTaskAllocation returns the allocation across different dimensions of a
// task
func GetTaskAllocation(rmTask *resmgr.Task) *Allocation {
alloc := initializeZeroAlloc()
taskResource := ConvertToResmgrResource(rmTask.Resource)
// check if the task is non-preemptible
if rmTask.GetPreemptible() {
alloc.Value[PreemptibleAllocation] = taskResource
} else {
alloc.Value[NonPreemptibleAllocation] = taskResource
}
// check if its a controller task
if rmTask.GetController() {
alloc.Value[ControllerAllocation] = taskResource
}
if rmTask.GetRevocable() {
alloc.Value[SlackAllocation] = taskResource
} else {
alloc.Value[NonSlackAllocation] = taskResource
}
// every task account for total allocation
alloc.Value[TotalAllocation] = taskResource
return alloc
}
// ZeroResource represents the minimum Value of a resource
var ZeroResource = &Resources{
CPU: float64(0),
GPU: float64(0),
DISK: float64(0),
MEMORY: float64(0),
}
// Resources is a non-thread safe helper struct holding recognized resources.
type Resources struct {
CPU float64
MEMORY float64
DISK float64
GPU float64
}
// GetCPU returns the CPU resource
func (r *Resources) GetCPU() float64 {
return r.CPU
}
// GetDisk returns the disk resource
func (r *Resources) GetDisk() float64 {
return r.DISK
}
// GetMem returns the memory resource
func (r *Resources) GetMem() float64 {
return r.MEMORY
}
// GetGPU returns the GPU resource
func (r *Resources) GetGPU() float64 {
return r.GPU
}
// Get returns the kind of resource
func (r *Resources) Get(kind string) float64 {
switch kind {
case common.CPU:
return r.GetCPU()
case common.GPU:
return r.GetGPU()
case common.MEMORY:
return r.GetMem()
case common.DISK:
return r.GetDisk()
}
return float64(0)
}
// Set sets the kind of resource with the Value
func (r *Resources) Set(kind string, value float64) {
switch kind {
case common.CPU:
r.CPU = value
case common.GPU:
r.GPU = value
case common.MEMORY:
r.MEMORY = value
case common.DISK:
r.DISK = value
}
}
// Add atomically add another scalar resources onto current one.
func (r *Resources) Add(other *Resources) *Resources {
return &Resources{
CPU: r.CPU + other.CPU,
MEMORY: r.MEMORY + other.MEMORY,
DISK: r.DISK + other.DISK,
GPU: r.GPU + other.GPU,
}
}
func lessThanOrEqual(f1, f2 float64) bool {
v := f1 - f2
if math.Abs(v) < util.ResourceEpsilon {
return true
}
return v < 0
}
// LessThanOrEqual determines current Resources is less than or equal
// the other one.
func (r *Resources) LessThanOrEqual(other *Resources) bool {
return lessThanOrEqual(r.CPU, other.CPU) &&
lessThanOrEqual(r.MEMORY, other.MEMORY) &&
lessThanOrEqual(r.DISK, other.DISK) &&
lessThanOrEqual(r.GPU, other.GPU)
}
func equal(f1, f2 float64) bool {
return f1 == f2
}
// Equal determines current Resources is equal to
// the other one.
func (r *Resources) Equal(other *Resources) bool {
return equal(r.CPU, other.CPU) &&
equal(r.MEMORY, other.MEMORY) &&
equal(r.DISK, other.DISK) &&
equal(r.GPU, other.GPU)
}
// ConvertToResmgrResource converts task resource config to scalar.Resources
func ConvertToResmgrResource(resource *task.ResourceConfig) *Resources {
return &Resources{
CPU: resource.GetCpuLimit(),
DISK: resource.GetDiskLimitMb(),
GPU: resource.GetGpuLimit(),
MEMORY: resource.GetMemLimitMb(),
}
}
// GetGangResources aggregates gang resources to resmgr resources
func GetGangResources(gang *resmgrsvc.Gang) *Resources {
if gang == nil {
return nil
}
totalRes := &Resources{}
for _, task := range gang.GetTasks() {
totalRes = totalRes.Add(
ConvertToResmgrResource(task.GetResource()))
}
return totalRes
}
func (r *Resources) String() string {
return fmt.Sprintf("CPU:%.2f MEM:%.2f DISK:%.2f GPU:%.2f",
r.GetCPU(), r.GetMem(), r.GetDisk(), r.GetGPU())
}
// Min Gets the minimum value for each resource type
func Min(r1, r2 *Resources) *Resources {
return &Resources{
CPU: math.Min(r1.GetCPU(), r2.GetCPU()),
MEMORY: math.Min(r1.GetMem(), r2.GetMem()),
DISK: math.Min(r1.GetDisk(), r2.GetDisk()),
GPU: math.Min(r1.GetGPU(), r2.GetGPU()),
}
}
// Subtract another scalar resources from current one and return a new copy of result.
func (r *Resources) Subtract(other *Resources) *Resources {
var result Resources
if r.CPU < other.CPU {
log.WithFields(log.Fields{
"from_cpu ": r.CPU,
"value_cpu": other.CPU,
}).Debug("Subtracted Value is Greater")
result.CPU = float64(0)
} else {
result.CPU = r.CPU - other.CPU
if result.CPU < util.ResourceEpsilon {
result.CPU = float64(0)
}
}
if r.GPU < other.GPU {
log.WithFields(log.Fields{
"from_gpu ": r.GPU,
"value_gpu": other.GPU,
}).Debug("Subtracted Value is Greater")
result.GPU = float64(0)
} else {
result.GPU = r.GPU - other.GPU
if result.GPU < util.ResourceEpsilon {
result.GPU = float64(0)
}
}
if r.MEMORY < other.MEMORY {
log.WithFields(log.Fields{
"from_memory ": r.MEMORY,
"value_memory": other.MEMORY,
}).Debug("Subtracted Value is Greater")
result.MEMORY = float64(0)
} else {
result.MEMORY = r.MEMORY - other.MEMORY
if result.MEMORY < util.ResourceEpsilon {
result.MEMORY = float64(0)
}
}
if r.DISK < other.DISK {
log.WithFields(log.Fields{
"from_disk": r.DISK,
"value_disk": other.DISK,
}).Debug("Subtracted Value is Greater")
result.DISK = float64(0)
} else {
result.DISK = r.DISK - other.DISK
if result.DISK < util.ResourceEpsilon {
result.DISK = float64(0)
}
}
return &result
}
// Clone creates the new new object of the resources
// and copies the values to the new object and return
// the new object
func (r *Resources) Clone() *Resources {
return &Resources{
CPU: r.CPU,
DISK: r.DISK,
MEMORY: r.MEMORY,
GPU: r.GPU,
}
}
// Copy copies the values from the passed resource object
// to the calling object
func (r *Resources) Copy(other *Resources) {
r.CPU = other.CPU
r.DISK = other.DISK
r.MEMORY = other.MEMORY
r.GPU = other.GPU
}