pkg/scheduler/objects/node.go (528 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package objects
import (
"fmt"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/plugins"
schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
const (
UnknownInstanceType = "UNKNOWN"
)
type Node struct {
// Fields for fast access These fields are considered read only.
// Values should only be set when creating a new node and never changed.
NodeID string
Hostname string
Rackname string
Partition string
// Private fields need protection
attributes map[string]string
totalResource *resources.Resource
occupiedResource *resources.Resource
allocatedResource *resources.Resource
availableResource *resources.Resource
allocations map[string]*Allocation
schedulable bool
reservations map[string]*reservation // a map of reservations
listeners []NodeListener // a list of node listeners
nodeEvents *schedEvt.NodeEvents
locking.RWMutex
}
func NewNode(proto *si.NodeInfo) *Node {
// safeguard against panic
if proto == nil {
return nil
}
sn := &Node{
NodeID: proto.NodeID,
reservations: make(map[string]*reservation),
totalResource: resources.NewResourceFromProto(proto.SchedulableResource),
allocatedResource: resources.NewResource(),
occupiedResource: resources.NewResource(),
allocations: make(map[string]*Allocation),
schedulable: true,
listeners: make([]NodeListener, 0),
}
sn.nodeEvents = schedEvt.NewNodeEvents(events.GetEventSystem())
// initialise available resources
var err error
sn.availableResource, err = resources.SubErrorNegative(sn.totalResource, sn.occupiedResource)
if err != nil {
log.Log(log.SchedNode).Error("New node created with no available resources",
zap.Error(err))
}
sn.initializeAttribute(proto.Attributes)
return sn
}
func (sn *Node) String() string {
if sn == nil {
return "node is nil"
}
return fmt.Sprintf("NodeID %s, Partition %s, Schedulable %t, Total %s, Allocated %s, #allocations %d",
sn.NodeID, sn.Partition, sn.schedulable, sn.totalResource, sn.allocatedResource, len(sn.allocations))
}
// Set the attributes and fast access fields.
// Unlocked call: should only be called on create or from test code
func (sn *Node) initializeAttribute(newAttributes map[string]string) {
sn.attributes = newAttributes
// Avoid passing empty nodeAttributes in initializeAttribute
if len(sn.attributes) == 0 {
sn.attributes = map[string]string{}
}
sn.Hostname = sn.attributes[siCommon.HostName]
sn.Rackname = sn.attributes[siCommon.RackName]
sn.Partition = sn.attributes[siCommon.NodePartition]
}
// Get an attribute by name. The most used attributes can be directly accessed via the
// fields: HostName, RackName and Partition.
// This is a lock free call. All attributes are considered read only
func (sn *Node) GetAttribute(key string) string {
return sn.attributes[key]
}
func (sn *Node) GetAttributes() map[string]string {
return sn.attributes
}
// Get InstanceType of this node.
// This is a lock free call because all attributes are considered read only
func (sn *Node) GetInstanceType() string {
itype := sn.GetAttribute(siCommon.InstanceType)
if itype != "" {
return itype
}
return UnknownInstanceType
}
// GetReservationKeys Return an array of all reservation keys for the node.
// This will return an empty array if there are no reservations.
// Visible for tests
func (sn *Node) GetReservationKeys() []string {
sn.RLock()
defer sn.RUnlock()
keys := make([]string, 0)
for key := range sn.reservations {
keys = append(keys, key)
}
return keys
}
func (sn *Node) GetCapacity() *resources.Resource {
sn.RLock()
defer sn.RUnlock()
return sn.totalResource.Clone()
}
// SetCapacity changes the node resource capacity and returns the resource delta.
// The delta is positive for an increased capacity and negative for a decrease.
func (sn *Node) SetCapacity(newCapacity *resources.Resource) *resources.Resource {
var delta *resources.Resource
defer func() {
if delta != nil {
sn.notifyListeners()
}
}()
sn.Lock()
defer sn.Unlock()
if resources.Equals(sn.totalResource, newCapacity) {
log.Log(log.SchedNode).Debug("skip updating capacity, not changed")
return nil
}
delta = resources.Sub(newCapacity, sn.totalResource)
sn.totalResource = newCapacity
sn.refreshAvailableResource()
sn.nodeEvents.SendNodeCapacityChangedEvent(sn.NodeID, sn.totalResource.Clone())
return delta
}
func (sn *Node) GetOccupiedResource() *resources.Resource {
sn.RLock()
defer sn.RUnlock()
return sn.occupiedResource.Clone()
}
func (sn *Node) UpdateAllocatedResource(delta *resources.Resource) {
sn.Lock()
defer sn.Unlock()
sn.allocatedResource.AddTo(delta)
sn.allocatedResource.Prune()
sn.refreshAvailableResource()
}
func (sn *Node) SetOccupiedResource(occupiedResource *resources.Resource) {
defer sn.notifyListeners()
sn.Lock()
defer sn.Unlock()
if resources.Equals(sn.occupiedResource, occupiedResource) {
log.Log(log.SchedNode).Debug("skip updating occupiedResource, not changed")
return
}
sn.occupiedResource = occupiedResource
sn.nodeEvents.SendNodeOccupiedResourceChangedEvent(sn.NodeID, sn.occupiedResource.Clone())
sn.refreshAvailableResource()
}
// refresh node available resource based on the latest total, allocated and occupied resources.
// this call assumes the caller already acquires the lock.
func (sn *Node) refreshAvailableResource() {
sn.availableResource = sn.totalResource.Clone()
sn.availableResource.SubFrom(sn.allocatedResource)
sn.availableResource.SubFrom(sn.occupiedResource)
sn.availableResource.Prune()
// check if any quantity is negative: a nil resource is all 0's
if !resources.StrictlyGreaterThanOrEquals(sn.availableResource, nil) {
log.Log(log.SchedNode).Warn("Node update triggered over allocated node",
zap.Stringer("available", sn.availableResource),
zap.Stringer("total", sn.totalResource),
zap.Stringer("occupied", sn.occupiedResource),
zap.Stringer("allocated", sn.allocatedResource))
}
}
// Return the allocation based on the allocationKey of the allocation.
// returns nil if the allocation is not found
func (sn *Node) GetAllocation(allocationKey string) *Allocation {
sn.RLock()
defer sn.RUnlock()
return sn.allocations[allocationKey]
}
// GetYunikornAllocations returns a copy of Yunikorn allocations on this node
func (sn *Node) GetYunikornAllocations() []*Allocation {
sn.RLock()
defer sn.RUnlock()
return sn.getAllocations(false)
}
// GetForeignAllocations returns a copy of non-Yunikorn allocations on this node
func (sn *Node) GetForeignAllocations() []*Allocation {
sn.RLock()
defer sn.RUnlock()
return sn.getAllocations(true)
}
func (sn *Node) getAllocations(foreign bool) []*Allocation {
arr := make([]*Allocation, 0)
for _, v := range sn.allocations {
if v.IsForeign() == foreign {
arr = append(arr, v)
}
}
return arr
}
// Set the node to unschedulable.
// This will cause the node to be skipped during the scheduling cycle.
// Visible for testing only
func (sn *Node) SetSchedulable(schedulable bool) {
defer sn.notifyListeners()
sn.Lock()
defer sn.Unlock()
sn.schedulable = schedulable
sn.nodeEvents.SendNodeSchedulableChangedEvent(sn.NodeID, sn.schedulable)
}
// Can this node be used in scheduling.
func (sn *Node) IsSchedulable() bool {
sn.RLock()
defer sn.RUnlock()
return sn.schedulable
}
// Get the allocated resource on this node.
func (sn *Node) GetAllocatedResource() *resources.Resource {
sn.RLock()
defer sn.RUnlock()
return sn.allocatedResource.Clone()
}
// Get the available resource on this node.
func (sn *Node) GetAvailableResource() *resources.Resource {
sn.RLock()
defer sn.RUnlock()
return sn.availableResource.Clone()
}
// GetFitInScoreForAvailableResource calculates a fit in score for "res" based on the current
// available resources, avoiding cloning. The caller must ensure that "res" cannot change while this method is running.
func (sn *Node) GetFitInScoreForAvailableResource(res *resources.Resource) float64 {
sn.RLock()
defer sn.RUnlock()
return res.FitInScore(sn.availableResource)
}
// Get the utilized resource on this node.
func (sn *Node) GetUtilizedResource() *resources.Resource {
total := sn.GetCapacity()
resourceAllocated := sn.GetAllocatedResource()
utilizedResource := make(map[string]resources.Quantity)
for name := range resourceAllocated.Resources {
if total.Resources[name] > 0 {
utilizedResource[name] = resources.CalculateAbsUsedCapacity(total, resourceAllocated).Resources[name]
}
}
return &resources.Resource{Resources: utilizedResource}
}
// FitInNode checks if the request fits in the node.
// All resources types requested must match the resource types provided by the nodes.
// A request may ask for only a subset of the types, but the node must provide at least the
// resource types requested in a larger or equal quantity as requested.
func (sn *Node) FitInNode(resRequest *resources.Resource) bool {
sn.RLock()
defer sn.RUnlock()
return sn.totalResource.FitIn(resRequest)
}
// Remove the allocation to the node.
// Returns nil if the allocation was not found and no changes are made. If the allocation
// is found the Allocation removed is returned. Used resources will decrease available
// will increase as per the allocation removed.
func (sn *Node) RemoveAllocation(allocationKey string) *Allocation {
var alloc *Allocation
defer func() {
if alloc != nil && !alloc.IsForeign() {
sn.notifyListeners()
}
}()
sn.Lock()
defer sn.Unlock()
alloc = sn.allocations[allocationKey]
if alloc != nil {
delete(sn.allocations, allocationKey)
if alloc.IsForeign() {
sn.occupiedResource = resources.Sub(sn.occupiedResource, alloc.GetAllocatedResource())
} else {
sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
sn.allocatedResource.Prune()
}
sn.availableResource.AddTo(alloc.GetAllocatedResource())
sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, alloc.allocationKey, alloc.GetAllocatedResource())
log.Log(log.SchedNode).Info("node allocation removed",
zap.String("appID", alloc.GetApplicationID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.Stringer("allocatedResource", alloc.GetAllocatedResource()),
zap.Bool("placeholder", alloc.IsPlaceholder()),
zap.String("targetNode", sn.NodeID))
return alloc
}
return nil
}
// TryAddAllocation attempts to add the allocation to the node. Used resources will increase available will decrease.
// A nil Allocation makes no changes. Preempted resources must have been released already.
// Do a sanity check to make sure it still fits in the node and nothing has changed
func (sn *Node) TryAddAllocation(alloc *Allocation) bool {
return sn.addAllocationInternal(alloc, false)
}
// AddAllocation adds the allocation to the node. Used resources will increase available will decrease.
// A nil Allocation makes no changes. Preempted resources must have been released already.
// Do a sanity check to make sure it still fits in the node and nothing has changed
func (sn *Node) AddAllocation(alloc *Allocation) {
_ = sn.addAllocationInternal(alloc, true)
}
// UpdateForeignAllocation updates a foreign allocation and re-calculates the available/occupied resources
func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation {
sn.Lock()
defer sn.Unlock()
key := alloc.GetAllocationKey()
existing := sn.allocations[key]
sn.allocations[key] = alloc
if existing == nil {
log.Log(log.SchedNode).Debug("unknown allocation to update",
zap.String("allocationKey", key))
return nil
}
existingResource := existing.GetAllocatedResource().Clone()
newResource := alloc.GetAllocatedResource().Clone()
delta := resources.Sub(newResource, existingResource)
delta.Prune()
log.Log(log.SchedNode).Info("node foreign allocation updated",
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.Stringer("deltaResource", delta),
zap.String("targetNode", sn.NodeID))
sn.occupiedResource.AddTo(delta)
sn.occupiedResource.Prune()
sn.refreshAvailableResource()
return existing
}
func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
if alloc == nil {
return false
}
result := false
foreign := alloc.IsForeign()
defer func() {
// check result to ensure we don't notify listeners unnecessarily
if result && !foreign {
sn.notifyListeners()
}
}()
sn.Lock()
defer sn.Unlock()
// check if this still fits: it might have changed since pre-check
res := alloc.GetAllocatedResource()
if force || sn.availableResource.FitIn(res) {
sn.allocations[alloc.GetAllocationKey()] = alloc
if foreign {
sn.occupiedResource = resources.Add(sn.occupiedResource, alloc.GetAllocatedResource())
} else {
sn.allocatedResource.AddTo(res)
}
sn.availableResource.SubFrom(res)
sn.availableResource.Prune()
sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, alloc.allocationKey, res)
log.Log(log.SchedNode).Info("node allocation processed",
zap.String("appID", alloc.GetApplicationID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.Stringer("allocatedResource", alloc.GetAllocatedResource()),
zap.Bool("placeholder", alloc.IsPlaceholder()),
zap.String("targetNode", sn.NodeID))
result = true
return result
}
result = false
return result
}
// ReplaceAllocation replaces the placeholder with the real allocation on the node.
// The delta passed in is the difference in resource usage between placeholder and real allocation.
// It should always be a negative value or zero: it is a decrease in usage or no change
func (sn *Node) ReplaceAllocation(allocationKey string, replace *Allocation, delta *resources.Resource) {
defer sn.notifyListeners()
sn.Lock()
defer sn.Unlock()
replace.SetPlaceholderCreateTime(sn.allocations[allocationKey].GetCreateTime())
delete(sn.allocations, allocationKey)
replace.SetPlaceholderUsed(true)
sn.allocations[replace.GetAllocationKey()] = replace
before := sn.allocatedResource.Clone()
// The allocatedResource and availableResource should be updated in the same way
sn.allocatedResource.AddTo(delta)
sn.availableResource.SubFrom(delta)
sn.availableResource.Prune()
log.Log(log.SchedNode).Info("node allocation replaced",
zap.String("appID", replace.GetApplicationID()),
zap.String("allocationKey", replace.GetAllocationKey()),
zap.Stringer("allocatedResource", replace.GetAllocatedResource()),
zap.String("placeholderKey", allocationKey),
zap.String("targetNode", sn.NodeID))
if !before.FitIn(sn.allocatedResource) {
log.Log(log.SchedNode).Warn("unexpected increase in node usage after placeholder replacement",
zap.String("placeholder allocationKey", allocationKey),
zap.String("allocation allocationKey", replace.GetAllocationKey()),
zap.Stringer("delta", delta))
}
}
// CanAllocate checks if the proposed allocation fits in the available resources.
// If the proposed allocation does not fit false is returned.
func (sn *Node) CanAllocate(res *resources.Resource) bool {
sn.RLock()
defer sn.RUnlock()
return sn.availableResource.FitIn(res)
}
// Checking pre-conditions in the shim for an allocation.
func (sn *Node) preAllocateConditions(ask *Allocation) error {
return sn.preConditions(ask, true)
}
// Checking pre-conditions in the shim for a reservation.
func (sn *Node) preReserveConditions(ask *Allocation) error {
return sn.preConditions(ask, false)
}
// The pre conditions are implemented via plugins in the shim. If no plugins are implemented then
// the check will return true. If multiple plugins are implemented the first failure will stop the
// checks.
// The caller must thus not rely on all plugins being executed.
// This is a lock free call as it does not change the node and multiple predicate checks could be
// run at the same time.
func (sn *Node) preConditions(ask *Allocation, allocate bool) error {
// Check the predicates plugin (k8shim)
allocationKey := ask.GetAllocationKey()
if plugin := plugins.GetResourceManagerCallbackPlugin(); plugin != nil {
// checking predicates
if err := plugin.Predicates(&si.PredicatesArgs{
AllocationKey: allocationKey,
NodeID: sn.NodeID,
Allocate: allocate,
}); err != nil {
log.Log(log.SchedNode).Debug("running predicates failed",
zap.String("allocationKey", allocationKey),
zap.String("nodeID", sn.NodeID),
zap.Bool("allocateFlag", allocate),
zap.Error(err))
// running predicates failed
msg := err.Error()
ask.LogAllocationFailure(msg, allocate)
return err
}
}
// all predicate plugins passed
return nil
}
// preAllocateCheck checks if the node should be considered as a possible node to allocate on.
// No updates are made this only performs a pre allocate checks
func (sn *Node) preAllocateCheck(res *resources.Resource, allocationKey string) bool {
// cannot allocate zero or negative resource
if !resources.StrictlyGreaterThanZero(res) {
log.Log(log.SchedNode).Debug("pre alloc check: requested resource is zero",
zap.String("nodeID", sn.NodeID))
return false
}
// check if the node is reserved for this app/alloc
if sn.IsReserved() {
if !sn.isReservedForAllocation(allocationKey) {
log.Log(log.SchedNode).Debug("pre alloc check: node reserved for different alloc",
zap.String("nodeID", sn.NodeID),
zap.String("allocationKey", allocationKey))
return false
}
}
sn.RLock()
defer sn.RUnlock()
// returns true/false based on if the request fits in what we have calculated
return sn.availableResource.FitIn(res)
}
// IsReserved returns true if the node has been reserved for an allocation
func (sn *Node) IsReserved() bool {
sn.RLock()
defer sn.RUnlock()
return len(sn.reservations) > 0
}
// isReservedForAllocation returns true if and only if the node has been reserved by this allocation
// NOTE: a return value of false does not mean the node is not reserved by a different allocation, use IsReserved
// to test if the node has any reservation.
func (sn *Node) isReservedForAllocation(key string) bool {
if key == "" {
return false
}
sn.RLock()
defer sn.RUnlock()
return sn.reservations[key] != nil
}
// Reserve the node for this application and alloc combination.
// The reservation is checked against the node resources.
// If the reservation fails the function returns an error, if the reservation is made it returns nil.
func (sn *Node) Reserve(app *Application, ask *Allocation) error {
sn.Lock()
defer sn.Unlock()
appReservation := newReservation(sn, app, ask, false)
// this should really not happen just guard against panic either app or alloc are nil
if appReservation == nil {
log.Log(log.SchedNode).Debug("reservation creation failed unexpectedly",
zap.String("nodeID", sn.NodeID),
zap.Stringer("app", app),
zap.Stringer("alloc", ask))
return fmt.Errorf("reservation creation failed either app or alloc are nil on nodeID %s", sn.NodeID)
}
reqNode := ask.requiredNode != ""
if !reqNode && len(sn.reservations) > 0 {
log.Log(log.SchedNode).Warn("normal reservation on already reserved node",
zap.String("nodeID", sn.NodeID),
zap.String("new app", appReservation.appID),
zap.String("new alloc", appReservation.allocKey))
return common.ErrorNodeAlreadyReserved
}
// allow multiple required node reservations on the same node
if reqNode {
// make sure all other reservations are for required nodes
for _, reserved := range sn.reservations {
if reserved.alloc.requiredNode == "" {
log.Log(log.SchedNode).Warn("trying to add normal reservation to node with required node reservation",
zap.String("nodeID", sn.NodeID),
zap.String("existing app", reserved.appID),
zap.String("existing alloc", reserved.allocKey),
zap.String("new app", appReservation.appID),
zap.String("new alloc", appReservation.allocKey))
return fmt.Errorf("normal reservation: required node reservation present, nodeID %s", sn.NodeID)
}
}
}
// reservation must fit on the empty node
if !sn.totalResource.FitIn(ask.GetAllocatedResource()) {
log.Log(log.SchedNode).Debug("reservation does not fit on the node",
zap.String("nodeID", sn.NodeID),
zap.String("appID", app.ApplicationID),
zap.String("alloc", ask.GetAllocationKey()),
zap.Stringer("requested resources", ask.GetAllocatedResource()))
return common.ErrorNodeNotFitReserve
}
sn.reservations[ask.allocationKey] = appReservation
sn.nodeEvents.SendReservedEvent(sn.NodeID, ask.GetAllocatedResource(), ask.GetAllocationKey())
// reservation added successfully
return nil
}
// unReserve the node for this application and alloc combination
// If the reservation does not exist or alloc is nil it returns 0 for reservations removed,
// if the reservation is removed it returns 1.
func (sn *Node) unReserve(alloc *Allocation) int {
if alloc == nil {
return 0
}
sn.Lock()
defer sn.Unlock()
if _, ok := sn.reservations[alloc.allocationKey]; ok {
delete(sn.reservations, alloc.allocationKey)
sn.nodeEvents.SendUnreservedEvent(sn.NodeID, alloc.GetAllocatedResource(), alloc.GetAllocationKey())
return 1
}
// reservation was not found
log.Log(log.SchedNode).Debug("reservation not found while removing from node",
zap.String("nodeID", sn.NodeID),
zap.String("alloc", alloc.GetAllocationKey()),
zap.String("appID", alloc.GetApplicationID()))
return 0
}
// GetReservations returns all reservation made on this node
func (sn *Node) GetReservations() []*reservation {
sn.Lock()
defer sn.Unlock()
var res []*reservation
if len(sn.reservations) > 0 {
for _, r := range sn.reservations {
res = append(res, r)
}
}
return res
}
// GetResourceUsageShares gets a map of name -> resource usages per type in shares (0 to 1). Can return NaN.
func (sn *Node) GetResourceUsageShares() map[string]float64 {
sn.RLock()
defer sn.RUnlock()
res := make(map[string]float64)
if sn.totalResource == nil {
// no resources present, so no usage
return res
}
for k, v := range sn.totalResource.Resources {
res[k] = float64(1) - (float64(sn.availableResource.Resources[k]) / float64(v))
}
return res
}
func (sn *Node) AddListener(listener NodeListener) {
sn.Lock()
defer sn.Unlock()
sn.listeners = append(sn.listeners, listener)
}
func (sn *Node) RemoveListener(listener NodeListener) {
sn.Lock()
defer sn.Unlock()
newListeners := make([]NodeListener, 0)
for _, entry := range sn.listeners {
if entry == listener {
continue
}
newListeners = append(newListeners, entry)
}
sn.listeners = newListeners
}
// Notifies listeners of changes to this node. This method must not be called while locks are held.
func (sn *Node) notifyListeners() {
for _, listener := range sn.getListeners() {
listener.NodeUpdated(sn)
}
}
func (sn *Node) getListeners() []NodeListener {
sn.RLock()
defer sn.RUnlock()
list := make([]NodeListener, len(sn.listeners))
copy(list, sn.listeners)
return list
}
func (sn *Node) SendNodeAddedEvent() {
sn.RLock()
defer sn.RUnlock()
sn.nodeEvents.SendNodeAddedEvent(sn.NodeID, sn.totalResource.Clone())
}
func (sn *Node) SendNodeRemovedEvent() {
sn.nodeEvents.SendNodeRemovedEvent(sn.NodeID)
}