pkg/hostmgr/scalar/resources.go (262 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scalar
import (
"fmt"
"math"
"sync"
mesos "github.com/uber/peloton/.gen/mesos/v1"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton"
pbpod "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod"
"github.com/uber/peloton/pkg/common/util"
)
// Resources is a non-thread safe helper struct holding recognized resources.
type Resources struct {
CPU float64
Mem float64
Disk float64
GPU float64
}
// a safe less than or equal to comparator which takes epsilon into consideration.
// TODO(zhitao): Explore fixed point number in T777007.
func lessThanOrEqual(f1, f2 float64) bool {
v := f1 - f2
if math.Abs(v) < util.ResourceEpsilon {
return true
}
return v < 0
}
// a safe less than to comparator which takes epsilon into consideration.
func lessThan(f1, f2 float64) bool {
v := f1 - f2
if math.Abs(v) < util.ResourceEpsilon {
return false
}
return v < 0
}
// GetCPU returns the CPU resource
func (r Resources) GetCPU() float64 {
return r.CPU
}
// GetDisk returns the Disk resource
func (r Resources) GetDisk() float64 {
return r.Disk
}
// GetMem returns the Memory resource
func (r Resources) GetMem() float64 {
return r.Mem
}
// GetGPU returns the GPU resource
func (r Resources) GetGPU() float64 {
return r.GPU
}
// HasGPU is a special condition to ensure exclusive protection for GPU.
func (r Resources) HasGPU() bool {
return math.Abs(r.GPU) > util.ResourceEpsilon
}
// Contains determines whether current Resources is large enough to contain
// the other one.
func (r Resources) Contains(other Resources) bool {
return lessThanOrEqual(other.CPU, r.CPU) &&
lessThanOrEqual(other.Mem, r.Mem) &&
lessThanOrEqual(other.Disk, r.Disk) &&
lessThanOrEqual(other.GPU, r.GPU)
}
// Compare method compares current Resources with the other one, return
// true if current Resources is strictly larger than or equal to / less than
// the other one (based on boolean cmpLess), false if not. Fields in the
// other one are ignore if they are 0.
func (r Resources) Compare(other Resources, cmpLess bool) bool {
// lessThan(r.XXX, other.XXX) != cmpLess XOR's those two values.
// cmpLess flag toggles either it's comparing the current one is
// greater than or equal to the other (when set to false), or
// the current one is less than the other one (when set to true).
// The following is the possible combinations of inputs, and their
// result:
//
// r.XXX < other.XXX cmpLess XOR return
// false false false true (current >= other)
// true false true false (current < other)
// false true true false (current >= other)
// true true false true (current < other)
if other.CPU > 0 && lessThan(r.CPU, other.CPU) != cmpLess {
return false
}
if other.Mem > 0 && lessThan(r.Mem, other.Mem) != cmpLess {
return false
}
if other.GPU > 0 && lessThan(r.GPU, other.GPU) != cmpLess {
return false
}
if other.Disk > 0 && lessThan(r.Disk, other.Disk) != cmpLess {
return false
}
return true
}
// Add atomically add another scalar resources onto current one.
func (r Resources) Add(other Resources) Resources {
return Resources{
CPU: r.CPU + other.CPU,
Mem: r.Mem + other.Mem,
Disk: r.Disk + other.Disk,
GPU: r.GPU + other.GPU,
}
}
// TrySubtract attempts to subtract another scalar resources from current one
// , but returns false if other has more resources.
func (r Resources) TrySubtract(other Resources) (Resources, bool) {
if !r.Contains(other) {
return Resources{}, false
}
return r.Subtract(other), true
}
// Subtract another scalar resources from current one and return a new copy of result.
func (r Resources) Subtract(other Resources) Resources {
return Resources{
CPU: r.CPU - other.CPU,
Mem: r.Mem - other.Mem,
Disk: r.Disk - other.Disk,
GPU: r.GPU - other.GPU,
}
}
// NonEmptyFields returns corresponding Mesos resource names for fields which are not empty.
func (r Resources) NonEmptyFields() []string {
var nonEmptyFields []string
if math.Abs(r.CPU) > util.ResourceEpsilon {
nonEmptyFields = append(nonEmptyFields, "cpus")
}
if math.Abs(r.Mem) > util.ResourceEpsilon {
nonEmptyFields = append(nonEmptyFields, "mem")
}
if math.Abs(r.Disk) > util.ResourceEpsilon {
nonEmptyFields = append(nonEmptyFields, "disk")
}
if math.Abs(r.GPU) > util.ResourceEpsilon {
nonEmptyFields = append(nonEmptyFields, "gpus")
}
return nonEmptyFields
}
// Empty returns whether all fields are empty now.
func (r Resources) Empty() bool {
return len(r.NonEmptyFields()) == 0
}
// String returns a formatted string for scalar resources
func (r Resources) String() string {
return fmt.Sprintf("CPU:%.2f MEM:%.2f DISK:%.2f GPU:%.2f",
r.GetCPU(), r.GetMem(), r.GetDisk(), r.GetGPU())
}
// HasResourceType validates requested resource type is present agent resource type.
func HasResourceType(agentRes, reqRes Resources, resourceType string) bool {
switch resourceType {
case "GPU":
return !reqRes.HasGPU() && agentRes.HasGPU()
}
return false
}
// FilterRevocableMesosResources separates revocable resources
// and non-revocable resources
func FilterRevocableMesosResources(rs []*mesos.Resource) (
revocable []*mesos.Resource, nonRevocable []*mesos.Resource) {
return FilterMesosResources(
rs,
func(r *mesos.Resource) bool {
return r.GetRevocable() != nil
})
}
// FilterMesosResources separates Mesos resources slice into matched and
// unmatched ones.
func FilterMesosResources(rs []*mesos.Resource, filter func(*mesos.Resource) bool) (
matched []*mesos.Resource, unmatched []*mesos.Resource) {
for _, r := range rs {
if filter(r) {
matched = append(matched, r)
} else {
unmatched = append(unmatched, r)
}
}
return matched, unmatched
}
// FromResourceConfig creates a new instance of `Resources` from a `ResourceConfig`.
func FromResourceConfig(rc *task.ResourceConfig) (r Resources) {
r.CPU = rc.GetCpuLimit()
r.Mem = rc.GetMemLimitMb()
r.Disk = rc.GetDiskLimitMb()
r.GPU = rc.GetGpuLimit()
return r
}
// FromResourceSpec creates a new instance of `Resources` from a `ResourceSpec`
func FromResourceSpec(rc *pbpod.ResourceSpec) (r Resources) {
r.CPU = rc.GetCpuLimit()
r.Mem = rc.GetMemLimitMb()
r.GPU = rc.GetGpuLimit()
return r
}
// FromPodSpec creates a new instance of `Resources` from a `PodSpec`
func FromPodSpec(podspec *pbpod.PodSpec) (r Resources) {
var res Resources
for _, c := range podspec.GetContainers() {
res = FromResourceSpec(
c.GetResource(),
)
r = r.Add(res)
}
for _, c := range podspec.GetInitContainers() {
res = FromResourceSpec(
c.GetResource(),
)
r = r.Add(res)
}
return r
}
// FromResourceSpec creates a new instance of `Resources` from
// `peloton.Resources`
func FromPelotonResources(rp *peloton.Resources) (r Resources) {
r.CPU = rp.GetCpu()
r.Mem = rp.GetMemMb()
r.Disk = rp.GetDiskMb()
r.GPU = rp.GetGpu()
return r
}
// ToPelotonResources creates a new instance of `peloton.Resources` from
// `Resources`
func ToPelotonResources(rs Resources) *peloton.Resources {
return &peloton.Resources{
Cpu: rs.CPU,
MemMb: rs.Mem,
DiskMb: rs.Disk,
Gpu: rs.GPU,
}
}
// FromMesosResource returns the scalar Resources from a single Mesos resource object.
func FromMesosResource(resource *mesos.Resource) (r Resources) {
value := resource.GetScalar().GetValue()
switch name := resource.GetName(); name {
case "cpus":
r.CPU += value
case "mem":
r.Mem += value
case "disk":
r.Disk += value
case "gpus":
r.GPU += value
}
return r
}
// FromMesosResources returns the scalar Resources from a list of Mesos resource objects.
func FromMesosResources(resources []*mesos.Resource) (r Resources) {
for _, resource := range resources {
tmp := FromMesosResource(resource)
r = r.Add(tmp)
}
return r
}
// FromOfferToMesosResources returns list of resources from a single offer
func FromOfferToMesosResources(offer *mesos.Offer) []*mesos.Resource {
var resources []*mesos.Resource
if offer == nil {
resources = append(resources, &mesos.Resource{})
} else {
for _, r := range offer.GetResources() {
resources = append(resources, r)
}
}
return resources
}
// FromOffersMapToMesosResources returns list of resources from a offerMap
func FromOffersMapToMesosResources(offerMap map[string]*mesos.Offer) []*mesos.Resource {
var resources []*mesos.Resource
for _, offer := range offerMap {
resources = append(resources, FromOfferToMesosResources(offer)...)
}
return resources
}
// FromOffer returns the scalar Resources from an offer.
func FromOffer(offer *mesos.Offer) Resources {
if offer == nil {
return Resources{}
}
return FromMesosResources(offer.GetResources())
}
// FromOffers returns the scalar Resources from given offers
func FromOffers(offers []*mesos.Offer) (r Resources) {
for _, offer := range offers {
tmp := FromOffer(offer)
r = r.Add(tmp)
}
return r
}
// FromOfferMap returns the scalar Resources from given id to offer map.
func FromOfferMap(offerMap map[string]*mesos.Offer) (r Resources) {
for _, offer := range offerMap {
tmp := FromOffer(offer)
r = r.Add(tmp)
}
return r
}
// Minimum returns minimum amount of resources in each type.
func Minimum(r1, r2 Resources) (m Resources) {
m.CPU = math.Min(r1.CPU, r2.CPU)
m.Mem = math.Min(r1.Mem, r2.Mem)
m.Disk = math.Min(r1.Disk, r2.Disk)
m.GPU = math.Min(r1.GPU, r2.GPU)
return m
}
// AtomicResources is a wrapper around `Resources` provide thread safety.
type AtomicResources struct {
sync.RWMutex
resources Resources
}
// Get returns a copy of current value.
func (a *AtomicResources) Get() Resources {
a.RLock()
defer a.RUnlock()
return a.resources
}
// Set sets value to r.
func (a *AtomicResources) Set(r Resources) {
a.Lock()
defer a.Unlock()
a.resources = r
}