pkg/resmgr/respool/respool.go (808 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package respool import ( "container/list" "math" "sync" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" "github.com/uber/peloton/.gen/peloton/api/v0/respool" "github.com/uber/peloton/.gen/peloton/private/resmgrsvc" "github.com/uber/peloton/pkg/common" rc "github.com/uber/peloton/pkg/resmgr/common" "github.com/uber/peloton/pkg/resmgr/queue" "github.com/uber/peloton/pkg/resmgr/scalar" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" ) const ( // ToDo (varung): Move this to resource manager config // Represents the default slack limit at Cluster level _defaultSlackLimit = 20 _defaultReservation = 0 _defaultLimit = 0 _defaultShare = 1 ) // node represents a node in a tree type node interface { // Returns the resource pool name. Name() string // Returns the resource pool ID. ID() string // Returns the resource pool's parent. Parent() ResPool // Sets the parent of the resource pool. SetParent(ResPool) // Returns the children(if any) of the resource pool. Children() *list.List // Sets the children of the resource pool. SetChildren(*list.List) // Returns true of the resource pool is a leaf. IsLeaf() bool // GetPath returns the resource pool path. GetPath() string // IsRoot returns true if the node is the root of the resource tree. IsRoot() bool } // ResPool is a node in a resource pool hierarchy. type ResPool interface { node // Returns the config of the resource pool. ResourcePoolConfig() *respool.ResourcePoolConfig // Sets the resource pool config. SetResourcePoolConfig(*respool.ResourcePoolConfig) // Returns a map of resources and its resource config. Resources() map[string]*respool.ResourceConfig // Converts to resource pool info. ToResourcePoolInfo() *respool.ResourcePoolInfo // Aggregates the child reservations by resource type. AggregatedChildrenReservations() (map[string]float64, error) // Enqueues gang (task list) into resource pool pending queue. EnqueueGang(gang *resmgrsvc.Gang) error // Dequeues gangs (task list) from the resource pool. DequeueGangs(int) ([]*resmgrsvc.Gang, error) // PeekGangs returns a list of gangs from the resource pool's queue based // on the queue type. limit determines the max number of gangs to be // returned. PeekGangs(qt QueueType, limit uint32) ([]*resmgrsvc.Gang, error) // SetEntitlement sets the entitlement of non-revocable resources // for non-revocable tasks + revocable tasks for this resource pool. SetEntitlement(res *scalar.Resources) // SetSlackEntitlement sets the entitlement of revocable cpus // + non-revocable resources [mem, disk, gpu] for the resource pool SetSlackEntitlement(res *scalar.Resources) // SetNonSlackEntitlement Sets the entitlement of non-revocable resources // for non-revocable tasks for this resource pool SetNonSlackEntitlement(res *scalar.Resources) // GetEntitlement returns the total entitlement of non-revocable resources // for this resource pool. GetEntitlement() *scalar.Resources // GetSlackEntitlement returns the entitlement for revocable tasks. GetSlackEntitlement() *scalar.Resources // GetNonSlackEntitlement returns the entitlement for non-revocable tasks. GetNonSlackEntitlement() *scalar.Resources // AddToAllocation adds resources to current allocation // for the resource pool. AddToAllocation(*scalar.Allocation) error // SubtractFromAllocation recaptures the resources from task. SubtractFromAllocation(*scalar.Allocation) error // GetTotalAllocatedResources returns the total resource allocation for the resource // pool. GetTotalAllocatedResources() *scalar.Resources // GetSlackAllocatedResources returns the slack allocation for the resource pool. GetSlackAllocatedResources() *scalar.Resources // GetNonSlackAllocatedResources returns resources allocated to non-revocable tasks. GetNonSlackAllocatedResources() *scalar.Resources // CalculateTotalAllocatedResources calculates the total allocation recursively for // all the children. CalculateTotalAllocatedResources() *scalar.Resources // SetAllocation sets the resource allocation for the resource pool. // Used by tests. SetTotalAllocatedResources(resources *scalar.Resources) // AddToDemand adds resources to current demand // for the resource pool. AddToDemand(res *scalar.Resources) error // SubtractFromDemand subtracts resources from current demand // for the resource pool. SubtractFromDemand(res *scalar.Resources) error // GetDemand returns the resource demand for the resource pool. GetDemand() *scalar.Resources // CalculateDemand calculates the resource demand // for the resource pool recursively for the subtree. CalculateDemand() *scalar.Resources // AddToSlackDemand adds resources to slack demand // for the resource pool. AddToSlackDemand(res *scalar.Resources) error // SubtractFromSlackDemand subtracts resources from slack demand // for the resource pool. SubtractFromSlackDemand(res *scalar.Resources) error // GetSlackDemand returns the slack resource demand for the resource pool. GetSlackDemand() *scalar.Resources // CalculateSlackDemand calculates the slack resource demand // for the resource pool recursively for the subtree. CalculateSlackDemand() *scalar.Resources // GetSlackLimit returns the limit of resources [mem,disk] // can be used by revocable tasks. GetSlackLimit() *scalar.Resources // AddInvalidTask will add the killed tasks to respool which can be // discarded asynchronously which scheduling. AddInvalidTask(task *peloton.TaskID) // UpdateResourceMetrics updates metrics for this resource pool // on each entitlement cycle calculation (15s) UpdateResourceMetrics() } // resPool implements the ResPool interface. type resPool struct { sync.RWMutex id string path string children *list.List parent ResPool preemptionCfg rc.PreemptionConfig resourceConfigs map[string]*respool.ResourceConfig poolConfig *respool.ResourcePoolConfig // Tracks the allocation across different task dimensions allocation *scalar.Allocation // Tracks the max resources this resource pool can use in a given // entitlement cycle entitlement *scalar.Resources // Tracks the max resources for this resource pool to be given // for non-revocable tasks. nonSlackEntitlement *scalar.Resources // Tracks the max slack resources this resource pool can use // in a given entitlement cycle for revocable tasks. // As of now, slack entitlement is only calculated for [cpus] slackEntitlement *scalar.Resources // Tracks the demand of resources, in the pending and the controller queue, // which are waiting to be admitted. // Once admitted their resources are accounted for in `allocation`. demand *scalar.Resources // Tracks the demand of resources required by revocable tasks, // which are waiting to be admitted. Here, [cpus] is slack resources, // and [mem,disk] is regular resources, shared with non-revocable tasks. // Once admitted their resources are accounted for in `allocation`. slackDemand *scalar.Resources // the reserved resources of this pool reservation *scalar.Resources // queue containing gangs waiting to be admitted into the resource pool. // queue semantics is defined by the SchedulingPolicy pendingQueue queue.Queue // queue containing controller tasks( // gang with 1 task) waiting to be admitted. // All tasks are enqueued to the pending queue, // the only reason a task would move from pending queue to controller // queue is when the task is of type CONTROLLER and it can't be admitted, // in that case the task is moved to the controller queue so that it // doesn't block the rest of the tasks in pending queue. controllerQueue queue.Queue // queue containing non-preemptible gangs, waiting to be admitted. // All tasks are enqueued to the pending queue, // the only reason a task would move from pending queue to np // queue is when the task is non-preemptible and it can't be admitted, // in that case the task is moved to the np queue so that it // doesn't block the rest of the tasks in pending queue. npQueue queue.Queue // queue containing revocable tasks waiting to be admitted. // All tasks are enqueued to the pending queue, // the only reason a task would move from pending queue to revocable // queue is when the task is revocable and it can't be admitted, // in that case the task is moved to the revocable queue so that it // doesn't block the rest of the tasks in pending queue. revocableQueue queue.Queue // The max limit of resources controller tasks can use in this pool controllerLimit *scalar.Resources // the max limit of resources revocable tasks can use in this pool. slackLimit *scalar.Resources // set of invalid tasks which will be discarded during admission control. invalidTasks map[string]bool metrics *Metrics } // NewRespool will initialize the resource pool node and return that. func NewRespool( scope tally.Scope, id string, parent ResPool, config *respool.ResourcePoolConfig, preemptionConfig rc.PreemptionConfig) (ResPool, error) { if config == nil { return nil, errors.Errorf("error creating resource pool %s; "+ "ResourcePoolConfig is nil", id) } pq, err := queue.CreateQueue(config.Policy, math.MaxInt64) if err != nil { return nil, errors.Wrapf(err, "error creating resource pool %s", id) } cq, err := queue.CreateQueue(config.Policy, math.MaxInt64) if err != nil { return nil, errors.Wrapf(err, "error creating resource pool %s", id) } nq, err := queue.CreateQueue(config.Policy, math.MaxInt64) if err != nil { return nil, errors.Wrapf(err, "error creating resource pool %s", id) } rq, err := queue.CreateQueue(config.Policy, math.MaxInt64) if err != nil { return nil, errors.Wrapf(err, "error creating revocable queue %s", id) } pool := &resPool{ id: id, children: list.New(), parent: parent, resourceConfigs: make(map[string]*respool.ResourceConfig), poolConfig: config, pendingQueue: pq, controllerQueue: cq, npQueue: nq, revocableQueue: rq, allocation: scalar.NewAllocation(), entitlement: &scalar.Resources{}, nonSlackEntitlement: &scalar.Resources{}, slackEntitlement: &scalar.Resources{}, demand: &scalar.Resources{}, slackDemand: &scalar.Resources{}, slackLimit: &scalar.Resources{}, reservation: &scalar.Resources{}, invalidTasks: make(map[string]bool), preemptionCfg: preemptionConfig, } pool.path = pool.calculatePath() // Initialize metrics pool.metrics = NewMetrics(scope.Tagged(map[string]string{ "path": pool.GetPath(), })) // Initialize resources and limits. pool.initialize(config) return pool, nil } // ID returns the resource pool UUID. func (n *resPool) ID() string { return n.id } // Name returns the resource pool name. func (n *resPool) Name() string { n.RLock() defer n.RUnlock() return n.poolConfig.Name } // Parent returns the resource pool's parent pool. func (n *resPool) Parent() ResPool { n.RLock() defer n.RUnlock() return n.parent } // SetParent will be setting the parent for the resource pool. func (n *resPool) SetParent(parent ResPool) { n.Lock() defer n.Unlock() n.parent = parent // calculate path again. n.path = n.calculatePath() } // SetChildren will be setting the children for the resource pool. func (n *resPool) SetChildren(children *list.List) { n.Lock() defer n.Unlock() n.children = children } // Children will be getting the children for the resource pool. func (n *resPool) Children() *list.List { n.RLock() defer n.RUnlock() return n.children } // Resources returns the resource configs. func (n *resPool) Resources() map[string]*respool.ResourceConfig { n.RLock() defer n.RUnlock() return n.resourceConfigs } // SetResourcePoolConfig sets the resource pool config and initializes the // resources. func (n *resPool) SetResourcePoolConfig(config *respool.ResourcePoolConfig) { n.Lock() defer n.Unlock() n.poolConfig = config n.initialize(config) } // ResourcePoolConfig returns the resource pool config. func (n *resPool) ResourcePoolConfig() *respool.ResourcePoolConfig { n.RLock() defer n.RUnlock() return n.poolConfig } // IsLeaf will tell us if this resource pool is leaf or not func (n *resPool) IsLeaf() bool { n.RLock() defer n.RUnlock() return n.isLeaf() } // EnqueueGang inserts a gang, which is a task list representing a gang // of 1 or more (same priority) tasks, into pending queue. func (n *resPool) EnqueueGang(gang *resmgrsvc.Gang) error { if (gang == nil) || (len(gang.Tasks) == 0) { err := errors.Errorf("gang has no elements") return err } if !n.isLeaf() { return errors.Errorf("resource pool %s is not a leaf node", n.id) } if err := n.pendingQueue.Enqueue(gang); err != nil { return err } // if task is revocable then add to slack demand if isRevocable(gang) { n.AddToSlackDemand(scalar.GetGangResources(gang)) return nil } n.AddToDemand(scalar.GetGangResources(gang)) return nil } // DequeueGangs dequeues a list of gangs from the // resource pool which can be admitted. // Each gang is a task list representing a gang // of 1 or more (same priority) tasks. // If there is no gang present then its a error, if any one present // then it will return all the gangs within limit without error // The dequeue works in the following way // 1. Checks the non preemptible queue to admit non preemptible gangs // 2. Checks the controller queue to admit controller preemptible gangs // 3. Checks the pending queue ( // and moves gangs to non preemptible queue or controller queue) func (n *resPool) DequeueGangs(limit int) ([]*resmgrsvc.Gang, error) { if limit <= 0 { err := errors.Errorf("limit %d is not valid", limit) return nil, err } if !n.isLeaf() { err := errors.Errorf("resource pool %s is not a leaf node", n.id) return nil, err } var err error var gangList []*resmgrsvc.Gang for _, qt := range []QueueType{ NonPreemptibleQueue, ControllerQueue, RevocableQueue, PendingQueue} { // check how many gangs left from the limit left := limit - len(gangList) if left == 0 { break } gangs, err := n.dequeue(qt, left) if err != nil { log.WithFields(log.Fields{ "respool_id": n.id, "queue": qt, "gangs_left": left, }).WithError(err).Warn("Error dequeueing from queue") continue } gangList = append(gangList, gangs...) } return gangList, err } // dequeues limit number of gangs from the respool for admission. func (n *resPool) dequeue( qt QueueType, limit int) ([]*resmgrsvc.Gang, error) { var err error var gangList []*resmgrsvc.Gang if limit <= 0 { return gangList, nil } for i := 0; i < limit; i++ { gangs, err := n.queue(qt).Peek(1) if err != nil { if _, ok := err.(queue.ErrorQueueEmpty); ok { // queue is empty we are done return gangList, nil } log.WithError(err).Error("Failed to peek into queue") return gangList, err } gang := gangs[0] err = admission.TryAdmit(gang, n, qt) if err != nil { if err == errGangInvalid || err == errSkipNonPreemptibleGang || err == errSkipControllerGang || err == errSkipRevocableGang { // the admission can fail : // 1. Because the gang is invalid. // In this case we move on to the next gang in the queue with the // expectation that the invalid gang is removed from the head of // the queue. // 2. Because the gang should be skipped ( // revocable gang, controller gang or non-preemptible gang) log.WithFields(log.Fields{ "respool_id": n.id, "error": err.Error(), }).Debug("skipping gang from admission") continue } break } gangList = append(gangList, gang) } return gangList, err } // AggregatedChildrenReservations returns aggregated child reservations by // resource kind func (n *resPool) AggregatedChildrenReservations() (map[string]float64, error) { totalReservation := make(map[string]float64) n.RLock() defer n.RUnlock() nodes := n.children // We need to find out the total reservation for e := nodes.Front(); e != nil; e = e.Next() { n, ok := e.Value.(ResPool) if !ok { return totalReservation, errors.Errorf( "failed to type assert child resource pool %v", e.Value) } resources := n.Resources() for kind, resource := range resources { totalReservation[kind] = totalReservation[kind] + resource.Reservation } } return totalReservation, nil } // ToResourcePoolInfo converts ResPool to ResourcePoolInfo func (n *resPool) ToResourcePoolInfo() *respool.ResourcePoolInfo { n.RLock() defer n.RUnlock() childrenResPools := n.children childrenResourcePoolIDs := make([]*peloton.ResourcePoolID, 0, childrenResPools.Len()) for child := childrenResPools.Front(); child != nil; child = child.Next() { childrenResourcePoolIDs = append(childrenResourcePoolIDs, &peloton.ResourcePoolID{ Value: child.Value.(*resPool).id, }) } var parentResPoolID *peloton.ResourcePoolID // handle Root's parent == nil if n.parent != nil { parentResPoolID = &peloton.ResourcePoolID{ Value: n.parent.ID(), } } return &respool.ResourcePoolInfo{ Id: &peloton.ResourcePoolID{ Value: n.id, }, Parent: parentResPoolID, Config: n.poolConfig, Children: childrenResourcePoolIDs, Path: &respool.ResourcePoolPath{Value: n.path}, Usage: n.createRespoolUsage( n.allocation.GetByType(scalar.TotalAllocation), n.allocation.GetByType(scalar.SlackAllocation)), } } // CalculateTotalAllocatedResources calculates the total allocation // recursively for all the children. func (n *resPool) CalculateTotalAllocatedResources() *scalar.Resources { n.Lock() defer n.Unlock() return n.calculateAllocation().GetByType(scalar.TotalAllocation) } // calculateAllocation calculates the allocation recursively for // all the children. func (n *resPool) calculateAllocation() *scalar.Allocation { if n.isLeaf() { return n.allocation } allocation := scalar.NewAllocation() for child := n.children.Front(); child != nil; child = child.Next() { if childResPool, ok := child.Value.(*resPool); ok { allocation = allocation.Add( childResPool.calculateAllocation()) } } n.allocation = allocation return allocation } // CalculateDemand calculates and sets the non-revocable tasks // resource demand for the resource pool recursively for the subtree func (n *resPool) CalculateDemand() *scalar.Resources { n.Lock() defer n.Unlock() return n.calculateDemand() } // calculateDemand is a private method to recursively calculate demand for non-leaf // resource pool nodes and return the demand for current resource pool func (n *resPool) calculateDemand() *scalar.Resources { if n.isLeaf() { return n.demand } demand := &scalar.Resources{} for child := n.children.Front(); child != nil; child = child.Next() { if childResPool, ok := child.Value.(*resPool); ok { demand = demand.Add( childResPool.calculateDemand()) } } n.demand = demand return demand } // CalculateAndSetDemand calculates and sets the resource demand // for the resource pool recursively for the subtree func (n *resPool) CalculateSlackDemand() *scalar.Resources { n.Lock() defer n.Unlock() return n.calculateSlackDemand() } // calculateSlackDemand in the private method to recursively calculate slack demand // for non-leaf resource pool nodes and return the slack demand for current resource pool func (n *resPool) calculateSlackDemand() *scalar.Resources { if n.isLeaf() { return n.slackDemand } slackDemand := &scalar.Resources{} for child := n.children.Front(); child != nil; child = child.Next() { if childResPool, ok := child.Value.(*resPool); ok { slackDemand = slackDemand.Add( childResPool.CalculateSlackDemand()) } } n.slackDemand = slackDemand return slackDemand } // getQueue returns the queue depending on the queue type func (n *resPool) queue(qt QueueType) queue.Queue { switch qt { case ControllerQueue: return n.controllerQueue case NonPreemptibleQueue: return n.npQueue case PendingQueue: return n.pendingQueue case RevocableQueue: return n.revocableQueue } // should never come here return nil } // creates the current resource pool's usage including all children. // [cpus] is the only slack resource supported. func (n *resPool) createRespoolUsage( allocation *scalar.Resources, slackAllocation *scalar.Resources) []*respool.ResourceUsage { resUsage := make([]*respool.ResourceUsage, 0, 4) ru := &respool.ResourceUsage{ Kind: common.CPU, Allocation: allocation.CPU - slackAllocation.CPU, Slack: slackAllocation.CPU, } resUsage = append(resUsage, ru) ru = &respool.ResourceUsage{ Kind: common.GPU, Allocation: allocation.GPU - slackAllocation.GPU, Slack: slackAllocation.GPU, } resUsage = append(resUsage, ru) ru = &respool.ResourceUsage{ Kind: common.MEMORY, Allocation: allocation.MEMORY - slackAllocation.MEMORY, Slack: slackAllocation.MEMORY, } resUsage = append(resUsage, ru) ru = &respool.ResourceUsage{ Kind: common.DISK, Allocation: allocation.DISK - slackAllocation.DISK, Slack: slackAllocation.DISK, } resUsage = append(resUsage, ru) return resUsage } // isLeaf checks if the current resource pool has child resource or not. func (n *resPool) isLeaf() bool { return n.children.Len() == 0 } // initializes the resources and limits for this pool // NB: The function calling initResources should acquire the lock func (n *resPool) initialize(cfg *respool.ResourcePoolConfig) { n.initResConfig(cfg) n.initControllerLimit(cfg) n.initSlackLimit(cfg) n.initReservation(cfg) } // initializes the reserved resources func (n *resPool) initReservation(cfg *respool.ResourcePoolConfig) { for kind, res := range n.resourceConfigs { switch kind { case common.CPU: n.reservation.CPU = res.Reservation case common.MEMORY: n.reservation.MEMORY = res.Reservation case common.GPU: n.reservation.GPU = res.Reservation case common.DISK: n.reservation.DISK = res.Reservation } } log.WithField("reservation", n.reservation). WithField("respool_id", n.id). Info("Setting reservation") } func (n *resPool) initResConfig(cfg *respool.ResourcePoolConfig) { for _, res := range cfg.Resources { n.resourceConfigs[res.Kind] = res } } // initControllerLimit initializes the limit of resources controller tasks can use. func (n *resPool) initControllerLimit(cfg *respool.ResourcePoolConfig) { climit := cfg.GetControllerLimit() if climit == nil { n.controllerLimit = nil return } controllerLimit := &scalar.Resources{} multiplier := climit.MaxPercent / 100 for kind, res := range n.resourceConfigs { switch kind { case common.CPU: controllerLimit.CPU = res.Reservation * multiplier case common.MEMORY: controllerLimit.MEMORY = res.Reservation * multiplier case common.GPU: controllerLimit.GPU = res.Reservation * multiplier case common.DISK: controllerLimit.DISK = res.Reservation * multiplier } } n.controllerLimit = controllerLimit log.WithField("controller_limit", controllerLimit). WithField("respool_id", n.id). Info("Setting controller limit") } // initSlackLimit initializes the limit of resources revocable tasks can use. func (n *resPool) initSlackLimit(cfg *respool.ResourcePoolConfig) { slimit := cfg.GetSlackLimit() if slimit == nil { slimit = &respool.SlackLimit{ MaxPercent: _defaultSlackLimit, } } slackLimit := &scalar.Resources{} multiplier := slimit.GetMaxPercent() / 100 for kind, res := range n.resourceConfigs { switch kind { case common.CPU: slackLimit.CPU = 0 case common.GPU: slackLimit.GPU = res.Reservation * multiplier case common.MEMORY: slackLimit.MEMORY = res.Reservation * multiplier case common.DISK: slackLimit.DISK = res.Reservation * multiplier } } n.slackLimit = slackLimit log.WithFields(log.Fields{ "slack_limit": slackLimit, "respool_id": n.id, }).Info("Setting slack limit") } // SlackLimit returns the slack limit of the resource pool func (n *resPool) GetSlackLimit() *scalar.Resources { n.RLock() defer n.RUnlock() return n.slackLimit } // SetEntitlement sets the entitlement of non-revocable resources // for non-revocable tasks + revocable tasks for this resource pool. func (n *resPool) SetEntitlement(res *scalar.Resources) { n.Lock() defer n.Unlock() n.entitlement = res log.WithFields(log.Fields{ "respool_id": n.id, "entitlement": res, }).Debug("Setting Entitlement") } // SetSlackEntitlement sets the entitlement of revocable cpus // + non-revocable resources [mem, disk, gpu] for the resource pool func (n *resPool) SetSlackEntitlement(res *scalar.Resources) { n.Lock() defer n.Unlock() n.slackEntitlement = res log.WithFields(log.Fields{ "respool_id": n.id, "slack_entitlement": res, }).Debug("Setting Slack Entitlement") } // SetNonSlackEntitlement Sets the entitlement of non-revocable resources // for non-revocable tasks for this resource pool func (n *resPool) SetNonSlackEntitlement(res *scalar.Resources) { n.Lock() defer n.Unlock() n.nonSlackEntitlement = res } // GetEntitlement returns the total entitlement of non-revocable resources // for this resource pool. func (n *resPool) GetEntitlement() *scalar.Resources { n.RLock() defer n.RUnlock() return n.entitlement } // GetSlackEntitlement returns the entitlement for revocable tasks. func (n *resPool) GetSlackEntitlement() *scalar.Resources { n.RLock() defer n.RUnlock() return n.slackEntitlement } // GetNonSlackEntitlement returns the entitlement for non-revocable tasks. func (n *resPool) GetNonSlackEntitlement() *scalar.Resources { n.RLock() defer n.RUnlock() return n.nonSlackEntitlement } // GetTotalAllocatedResources gets the resource allocation for the pool func (n *resPool) GetTotalAllocatedResources() *scalar.Resources { n.RLock() defer n.RUnlock() return n.allocation.GetByType(scalar.TotalAllocation) } // GetSlackAllocatedResources gets the resource allocation for the pool func (n *resPool) GetSlackAllocatedResources() *scalar.Resources { n.RLock() defer n.RUnlock() return n.allocation.GetByType(scalar.SlackAllocation) } // GetNonSlackAllocatedResources returns resources allocated to non-revocable tasks. func (n *resPool) GetNonSlackAllocatedResources() *scalar.Resources { n.RLock() defer n.RUnlock() return n.allocation.GetByType(scalar.NonSlackAllocation) } // SetAllocation sets the resource allocation for the pool func (n *resPool) SetTotalAllocatedResources(allocation *scalar.Resources) { n.Lock() defer n.Unlock() n.allocation.Value[scalar.TotalAllocation] = allocation } // GetDemand gets the resource demand for the pool func (n *resPool) GetDemand() *scalar.Resources { n.RLock() defer n.RUnlock() return n.demand } // GetSlackDemand gets the resource demand for the pool func (n *resPool) GetSlackDemand() *scalar.Resources { n.RLock() defer n.RUnlock() return n.slackDemand } // SubtractFromAllocation updates the allocation for the resource pool func (n *resPool) SubtractFromAllocation(allocation *scalar.Allocation) error { n.Lock() defer n.Unlock() newAllocation := n.allocation.Subtract(allocation) if newAllocation == nil { return errors.Errorf("couldn't update the resources") } n.allocation = newAllocation log.WithFields(log.Fields{ "respool_id": n.id, "total_alloc": n.allocation.GetByType( scalar.TotalAllocation), "non_preemptible_alloc": n.allocation.GetByType( scalar.NonPreemptibleAllocation), "controller_alloc": n.allocation.GetByType( scalar.ControllerAllocation), "slack_alloc": n.allocation.GetByType( scalar.SlackAllocation), }).Debug("Current Allocation after subtracting allocation") return nil } // IsRoot returns true if the node is the root in the resource // pool hierarchy func (n *resPool) IsRoot() bool { n.RLock() defer n.RUnlock() return n.isRoot() } func (n *resPool) isRoot() bool { return n.id == common.RootResPoolID } // GetPath returns the fully qualified path of the resource pool // in the resource pool hierarchy // For the below resource hierarchy ; the "compute" resource pool would be // designated by path: /infrastructure/compute // root // ├─ infrastructure // │ └─ compute // └─ marketplace func (n *resPool) GetPath() string { n.RLock() defer n.RUnlock() return n.path } func (n *resPool) calculatePath() string { if n.isRoot() { return ResourcePoolPathDelimiter } if n.parent == nil || n.parent.IsRoot() { return ResourcePoolPathDelimiter + n.poolConfig.Name } return n.parent.GetPath() + ResourcePoolPathDelimiter + n.poolConfig.Name } // AddToAllocation adds resources to the allocation // for the resource pool func (n *resPool) AddToAllocation(allocation *scalar.Allocation) error { n.Lock() defer n.Unlock() n.allocation = n.allocation.Add(allocation) log.WithFields(log.Fields{ "respool_id": n.id, "total_alloc": n.allocation.GetByType( scalar.TotalAllocation), "non_preemptible_alloc": n.allocation.GetByType( scalar.NonPreemptibleAllocation), "controller_alloc": n.allocation.GetByType( scalar.ControllerAllocation), "slack_alloc": n.allocation.GetByType( scalar.SlackAllocation), }).Debug("Current Allocation after adding allocation") return nil } // AddToDemand adds resources to the demand // for the resource pool func (n *resPool) AddToDemand(res *scalar.Resources) error { n.Lock() defer n.Unlock() n.demand = n.demand.Add(res) log.WithFields(log.Fields{ "respool_id": n.id, "demand": n.demand, }).Debug("Current Demand after Adding resources") return nil } // AddToSlackDemand adds resources to the slack demand // for the resource pool func (n *resPool) AddToSlackDemand(res *scalar.Resources) error { n.Lock() defer n.Unlock() n.slackDemand = n.slackDemand.Add(res) log.WithFields(log.Fields{ "respool_id": n.id, "demand": n.slackDemand, }).Debug("Current Demand after Adding resources") return nil } // SubtractFromDemand subtracts resources from demand // for the resource pool func (n *resPool) SubtractFromDemand(res *scalar.Resources) error { n.Lock() defer n.Unlock() newDemand := n.demand.Subtract(res) if newDemand == nil { return errors.Errorf("Couldn't update the resources") } n.demand = newDemand log. WithField("respool_id", n.id). WithField("demand", n.demand). Debug("Current Demand " + "after removing resources") return nil } // SubtractFromSlackDemand subtracts resources from demand // for the resource pool func (n *resPool) SubtractFromSlackDemand(res *scalar.Resources) error { n.Lock() defer n.Unlock() newDemand := n.slackDemand.Subtract(res) if newDemand == nil { return errors.Errorf("Couldn't update the resources") } n.slackDemand = newDemand log.WithFields(log.Fields{ "respool_id": n.id, "slack_demand": n.slackDemand, }).Debug("Current Demand after removing resources") return nil } // updates static metrics(Share, Limit and Reservation) which depend on the config func (n *resPool) updateStaticResourceMetrics() { n.metrics.ResourcePoolShare.Update(getShare(n.resourceConfigs)) n.metrics.ResourcePoolLimit.Update(getLimits(n.resourceConfigs)) n.metrics.ResourcePoolReservation.Update(n.reservation) if n.controllerLimit != nil { n.metrics.ControllerLimit.Update(n.controllerLimit) } if n.slackLimit != nil { n.metrics.SlackLimit.Update(n.slackLimit) } } // updates dynamic metrics(Allocation, Entitlement, Available) which change based on // usage and entitlement calculation // ToDo (varung): expose available slack resources func (n *resPool) updateDynamicResourceMetrics() { n.metrics.TotalEntitlement.Update(n.entitlement) n.metrics.SlackEntitlement.Update(n.slackEntitlement) n.metrics.NonSlackEntitlement.Update(n.nonSlackEntitlement) n.metrics.TotalAllocation.Update(n.allocation.GetByType( scalar.TotalAllocation)) n.metrics.SlackAllocation.Update(n.allocation.GetByType( scalar.SlackAllocation)) n.metrics.NonPreemptibleAllocation.Update(n.allocation.GetByType( scalar.NonPreemptibleAllocation)) n.metrics.ControllerAllocation.Update(n.allocation.GetByType( scalar.ControllerAllocation)) n.metrics.NonSlackAllocation.Update(n.allocation.GetByType( scalar.NonSlackAllocation)) n.metrics.NonSlackAvailable.Update(n.nonSlackEntitlement. Subtract(n.allocation.GetByType(scalar.NonSlackAllocation))) n.metrics.SlackAvailable.Update(n.slackEntitlement. Subtract(n.allocation.GetByType(scalar.SlackAllocation))) n.metrics.Demand.Update(n.demand) n.metrics.SlackDemand.Update(n.slackDemand) n.metrics.PendingQueueSize.Update(float64(n.aggregateQueueByType(PendingQueue))) n.metrics.RevocableQueueSize.Update(float64(n.aggregateQueueByType(RevocableQueue))) n.metrics.ControllerQueueSize.Update(float64(n.aggregateQueueByType(ControllerQueue))) n.metrics.NPQueueSize.Update(float64(n.aggregateQueueByType(NonPreemptibleQueue))) } // aggregateQueueByType aggreagates the queue size for leaf resource pools func (n *resPool) aggregateQueueByType(qt QueueType) int { if n.isLeaf() { return n.queue(qt).Size() } queueSize := 0 for child := n.children.Front(); child != nil; child = child.Next() { if childResPool, ok := child.Value.(*resPool); ok { queueSize += childResPool.aggregateQueueByType(qt) } } return queueSize } // updates all the metrics (static and dynamic) func (n *resPool) UpdateResourceMetrics() { n.RLock() defer n.RUnlock() n.updateStaticResourceMetrics() n.updateDynamicResourceMetrics() } // AddInvalidTask adds an invalid task so that it can // remove them from the respool later func (n *resPool) AddInvalidTask(task *peloton.TaskID) { n.Lock() defer n.Unlock() n.invalidTasks[task.Value] = true } // PeekGangs returns a list of gangs from the queue based on the queue type. func (n *resPool) PeekGangs(qt QueueType, limit uint32) ([]*resmgrsvc.Gang, error) { n.RLock() defer n.RUnlock() switch qt { case PendingQueue: return n.pendingQueue.Peek(limit) case ControllerQueue: return n.controllerQueue.Peek(limit) case NonPreemptibleQueue: return n.npQueue.Peek(limit) case RevocableQueue: return n.revocableQueue.Peek(limit) } // should never come here return nil, nil } func (n *resPool) isPreemptionEnabled() bool { return n.preemptionCfg.Enabled } func getLimits(resourceConfigs map[string]*respool.ResourceConfig) *scalar.Resources { var resources scalar.Resources for kind, res := range resourceConfigs { switch kind { case common.CPU: resources.CPU = res.Limit case common.GPU: resources.GPU = res.Limit case common.MEMORY: resources.MEMORY = res.Limit case common.DISK: resources.DISK = res.Limit } } return &resources } func getShare(resourceConfigs map[string]*respool.ResourceConfig) *scalar.Resources { var resources scalar.Resources for kind, res := range resourceConfigs { switch kind { case common.CPU: resources.CPU = res.Share case common.GPU: resources.GPU = res.Share case common.MEMORY: resources.MEMORY = res.Share case common.DISK: resources.DISK = res.Share } } return &resources }