pkg/scheduler/objects/allocation.go (436 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package objects
import (
"fmt"
"strconv"
"time"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type Allocation struct {
// Read-only fields
allocationKey string
applicationID string
taskGroupName string // task group this allocation belongs to
placeholder bool // is this a placeholder allocation
createTime time.Time // the time this allocation was created (used in reservations)
priority int32
requiredNode string
allowPreemptSelf bool
allowPreemptOther bool
originator bool
tags map[string]string
foreign bool
preemptable bool
// Mutable fields which need protection
allocated bool
allocLog map[string]*AllocationLogEntry
preemptionTriggered bool
preemptCheckTime time.Time
schedulingAttempted bool // whether scheduler core has tried to schedule this allocation
scaleUpTriggered bool // whether this allocation has triggered autoscaling or not
allocatedResource *resources.Resource
askEvents *schedEvt.AskEvents
userQuotaCheckFailed bool
headroomCheckFailed bool
// Fields used once an allocation is bound
nodeID string // the node this allocation is bound to
bindTime time.Time // the time this allocation was bound to a node
placeholderUsed bool // whether a placeholder was used for this allocation
placeholderCreateTime time.Time // the time the placeholder was created, if any
released bool // whether this allocation has been released (for placeholders)
release *Allocation // placeholder to be released for this allocation
preempted bool // whether this allocation has been marked for preemption
instType string // the instance type of the node at the time this allocation was bound
locking.RWMutex
}
type AllocationLogEntry struct {
Message string
LastOccurrence time.Time
Count int32
}
// NewAllocationFromSI Create a new Allocation which has already been placed on a node, populating it with info from
// the SI Allocation object. If the input object is invalid, nil is returned.
func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
if alloc == nil {
return nil
}
// this is a safety check placeholder and task group name must be set as a combo
// order is important as task group can be set without placeholder but not the other way around
if alloc.Placeholder && alloc.TaskGroupName == "" {
log.Log(log.SchedAllocation).Debug("Allocation cannot be a placeholder without a TaskGroupName",
zap.Stringer("SI alloc", alloc))
return nil
}
var createTime time.Time
siCreationTime, err := strconv.ParseInt(alloc.AllocationTags[siCommon.CreationTime], 10, 64)
if err != nil {
log.Log(log.SchedAllocation).Debug("CreationTime is not set on the Allocation object or invalid",
zap.String("creationTime", alloc.AllocationTags[siCommon.CreationTime]))
createTime = time.Now()
} else {
createTime = time.Unix(siCreationTime, 0)
}
foreign := false
preemptable := true
if foreignType, ok := alloc.AllocationTags[siCommon.Foreign]; ok {
foreign = true
switch foreignType {
case siCommon.AllocTypeStatic:
preemptable = false
case siCommon.AllocTypeDefault:
default:
log.Log(log.SchedAllocation).Warn("Foreign tag has illegal value, using default",
zap.String("value", foreignType))
}
}
var allocated bool
var nodeID string
var bindTime time.Time
if alloc.NodeID != "" {
allocated = true
nodeID = alloc.NodeID
bindTime = time.Now()
}
return &Allocation{
allocationKey: alloc.AllocationKey,
applicationID: alloc.ApplicationID,
allocatedResource: resources.NewResourceFromProto(alloc.ResourcePerAlloc),
tags: CloneAllocationTags(alloc.AllocationTags),
createTime: createTime,
priority: alloc.Priority,
placeholder: alloc.Placeholder,
taskGroupName: alloc.TaskGroupName,
requiredNode: common.GetRequiredNodeFromTag(alloc.AllocationTags),
allowPreemptSelf: alloc.PreemptionPolicy.GetAllowPreemptSelf(),
allowPreemptOther: alloc.PreemptionPolicy.GetAllowPreemptOther(),
originator: alloc.Originator,
allocLog: make(map[string]*AllocationLogEntry),
askEvents: schedEvt.NewAskEvents(events.GetEventSystem()),
allocated: allocated,
nodeID: nodeID,
bindTime: bindTime,
foreign: foreign,
preemptable: preemptable,
}
}
// NewSIFromAllocation converts the Allocation into a SI object. This is a limited set of values that gets copied into
// the SI. This is only used to communicate *back* to the RM. All other fields are considered incoming fields from
// the RM into the core. The limited set of fields link the Allocation to an Application and Node.
func (a *Allocation) NewSIFromAllocation() *si.Allocation {
if a == nil {
return nil
}
return &si.Allocation{
NodeID: a.GetNodeID(),
ApplicationID: a.GetApplicationID(),
AllocationKey: a.GetAllocationKey(),
ResourcePerAlloc: a.GetAllocatedResource().ToProto(), // needed in tests for restore
TaskGroupName: a.GetTaskGroup(),
Placeholder: a.IsPlaceholder(),
Originator: a.IsOriginator(),
PreemptionPolicy: &si.PreemptionPolicy{
AllowPreemptSelf: a.IsAllowPreemptSelf(),
AllowPreemptOther: a.IsAllowPreemptOther(),
},
}
}
func (a *Allocation) String() string {
if a == nil {
return "nil allocation"
}
return fmt.Sprintf("allocationKey %s, applicationID %s, Resource %s, Allocated %t", a.allocationKey, a.applicationID, a.GetAllocatedResource(), a.IsAllocated())
}
// GetAllocationKey returns the allocation key for this allocation.
func (a *Allocation) GetAllocationKey() string {
return a.allocationKey
}
// GetApplicationID returns the application ID for this allocation.
func (a *Allocation) GetApplicationID() string {
return a.applicationID
}
// GetTaskGroup returns the task group name for this allocation.
func (a *Allocation) GetTaskGroup() string {
return a.taskGroupName
}
// GetCreateTime returns the time this allocation was created.
func (a *Allocation) GetCreateTime() time.Time {
return a.createTime
}
// GetBindTime returns the time this allocation was bound.
func (a *Allocation) GetBindTime() time.Time {
a.RLock()
defer a.RUnlock()
return a.bindTime
}
// SetBindTime sets the time this allocation was bound.
func (a *Allocation) SetBindTime(bindTime time.Time) {
a.Lock()
defer a.Unlock()
a.bindTime = bindTime
}
// IsPlaceholderUsed returns whether this allocation is replacing a placeholder.
func (a *Allocation) IsPlaceholderUsed() bool {
a.RLock()
defer a.RUnlock()
return a.placeholderUsed
}
// SetPlaceholderUsed sets whether this allocation is replacing a placeholder.
func (a *Allocation) SetPlaceholderUsed(placeholderUsed bool) {
a.Lock()
defer a.Unlock()
a.placeholderUsed = placeholderUsed
}
// GetPlaceholderCreateTime returns the placeholder's create time for this allocation, if applicable.
func (a *Allocation) GetPlaceholderCreateTime() time.Time {
a.RLock()
defer a.RUnlock()
return a.placeholderCreateTime
}
// SetPlaceholderCreateTime updates the placeholder's creation time.
func (a *Allocation) SetPlaceholderCreateTime(placeholderCreateTime time.Time) {
a.Lock()
defer a.Unlock()
a.placeholderCreateTime = placeholderCreateTime
}
// IsPlaceholder returns whether this allocation represents a placeholder.
func (a *Allocation) IsPlaceholder() bool {
return a.placeholder
}
// IsOriginator returns whether this alloocation is the originator for the application.
func (a *Allocation) IsOriginator() bool {
return a.originator
}
// GetNodeID gets the node this allocation is assigned to.
func (a *Allocation) GetNodeID() string {
a.RLock()
defer a.RUnlock()
return a.nodeID
}
// SetNodeID sets the node this allocation is assigned to.
func (a *Allocation) SetNodeID(nodeID string) {
a.Lock()
defer a.Unlock()
a.nodeID = nodeID
}
// SetInstanceType sets node instance type for this allocation.
func (a *Allocation) SetInstanceType(instType string) {
a.Lock()
defer a.Unlock()
a.instType = instType
}
// GetInstanceType return the type of the instance used by this allocation.
func (a *Allocation) GetInstanceType() string {
a.RLock()
defer a.RUnlock()
return a.instType
}
// GetPriority returns the priority of this allocation.
func (a *Allocation) GetPriority() int32 {
return a.priority
}
// IsReleased returns the release status of the allocation.
func (a *Allocation) IsReleased() bool {
a.RLock()
defer a.RUnlock()
return a.released
}
// SetReleased updates the release status of the allocation.
func (a *Allocation) SetReleased(released bool) {
a.Lock()
defer a.Unlock()
a.released = released
}
// GetTagsClone returns the copy of the tags for this allocation.
func (a *Allocation) GetTagsClone() map[string]string {
return CloneAllocationTags(a.tags)
}
// GetRelease returns the associated release for this allocation.
func (a *Allocation) GetRelease() *Allocation {
a.RLock()
defer a.RUnlock()
return a.release
}
// SetRelease sets the release for this allocation.
func (a *Allocation) SetRelease(release *Allocation) {
a.Lock()
defer a.Unlock()
a.release = release
}
// ClearRelease removes any release from this allocation.
func (a *Allocation) ClearRelease() {
a.Lock()
defer a.Unlock()
a.release = nil
}
// HasRelease determines if this allocation has an associated release.
func (a *Allocation) HasRelease() bool {
a.RLock()
defer a.RUnlock()
return a.release != nil
}
// GetAllocatedResource returns a reference to the allocated resources for this allocation. This must be treated as read-only.
func (a *Allocation) GetAllocatedResource() *resources.Resource {
a.RLock()
defer a.RUnlock()
return a.allocatedResource
}
// SetAllocatedResource updates the allocated resources for this allocation.
func (a *Allocation) SetAllocatedResource(allocatedResource *resources.Resource) {
a.Lock()
defer a.Unlock()
a.allocatedResource = allocatedResource
}
// MarkPreempted marks the allocation as preempted.
func (a *Allocation) MarkPreempted() {
a.Lock()
defer a.Unlock()
a.preempted = true
}
// IsPreempted returns whether the allocation has been marked for preemption or not.
func (a *Allocation) IsPreempted() bool {
a.RLock()
defer a.RUnlock()
return a.preempted
}
// CloneAllocationTags clones a tag map for safe copying.
func CloneAllocationTags(tags map[string]string) map[string]string {
result := make(map[string]string)
for k, v := range tags {
result[k] = v
}
return result
}
// allocate marks this request as allocated and returns true if successful. A request may not be allocated multiple times.
func (a *Allocation) allocate() bool {
a.Lock()
defer a.Unlock()
if a.allocated {
return false
}
a.allocated = true
return true
}
// deallocate marks this request as pending and returns true if successful. A request may not be deallocated multiple times.
func (a *Allocation) deallocate() bool {
a.Lock()
defer a.Unlock()
if !a.allocated {
return false
}
a.allocated = false
return true
}
// IsAllocated determines if this request has been allocated yet.
func (a *Allocation) IsAllocated() bool {
a.RLock()
defer a.RUnlock()
return a.allocated
}
// GetPreemptCheckTime returns the time this allocation was last evaluated for preemption.
func (a *Allocation) GetPreemptCheckTime() time.Time {
a.RLock()
defer a.RUnlock()
return a.preemptCheckTime
}
// UpdatePreemptCheckTime is used to mark when this allocation is evaluated for preemption.
func (a *Allocation) UpdatePreemptCheckTime() {
a.Lock()
defer a.Unlock()
a.preemptCheckTime = time.Now()
}
// GetRequiredNode gets the node (if any) required by this allocation.
func (a *Allocation) GetRequiredNode() string {
return a.requiredNode
}
// SetRequiredNode sets the required node (used only by testing so lock is not taken)
func (a *Allocation) SetRequiredNode(node string) {
a.requiredNode = node
}
// IsAllowPreemptSelf returns whether preemption is allowed for this allocation.
func (a *Allocation) IsAllowPreemptSelf() bool {
return a.allowPreemptSelf
}
// IsAllowPreemptOther returns whether this allocation is allowed to preempt others.
func (a *Allocation) IsAllowPreemptOther() bool {
return a.allowPreemptOther
}
// GetTag returns the value of a named tag or an empty string if not present.
func (a *Allocation) GetTag(tagName string) string {
result, ok := a.tags[tagName]
if !ok {
return ""
}
return result
}
// LogAllocationFailure keeps track of preconditions not being met for an allocation.
func (a *Allocation) LogAllocationFailure(message string, allocate bool) {
// for now, don't log reservations
if !allocate {
return
}
a.Lock()
defer a.Unlock()
entry, ok := a.allocLog[message]
if !ok {
entry = &AllocationLogEntry{
Message: message,
}
a.allocLog[message] = entry
}
entry.LastOccurrence = time.Now()
entry.Count++
}
// SendPredicatesFailedEvent updates the event system with the reason for predicate failures.
// The map predicateErrors contains how many times certain predicates failed in the scheduling cycle for this ask.
func (a *Allocation) SendPredicatesFailedEvent(predicateErrors map[string]int) {
a.askEvents.SendPredicatesFailed(a.allocationKey, a.applicationID, predicateErrors, a.GetAllocatedResource())
}
// SendRequiredNodePreemptionFailedEvent updates the event system with required node preemption failed event.
func (a *Allocation) SendRequiredNodePreemptionFailedEvent(node string) {
a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey, a.applicationID, node, a.GetAllocatedResource())
}
// SendPreemptedBySchedulerEvent updates the event system with the preemption event.
func (a *Allocation) SendPreemptedBySchedulerEvent(preemptorAllocKey, preemptorAppId, preemptorQueuePath string) {
a.askEvents.SendPreemptedByScheduler(a.allocationKey, a.applicationID, preemptorAllocKey, preemptorAppId, preemptorQueuePath, a.GetAllocatedResource())
}
// GetAllocationLog returns a list of log entries corresponding to allocation preconditions not being met.
func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
a.RLock()
defer a.RUnlock()
res := make([]*AllocationLogEntry, len(a.allocLog))
i := 0
for _, entry := range a.allocLog {
res[i] = &AllocationLogEntry{
Message: entry.Message,
LastOccurrence: entry.LastOccurrence,
Count: entry.Count,
}
i++
}
return res
}
// MarkTriggeredPreemption marks the current allocation because it triggered preemption during scheduling.
func (a *Allocation) MarkTriggeredPreemption() {
a.Lock()
defer a.Unlock()
a.preemptionTriggered = true
}
// HasTriggeredPreemption returns whether this allocation has triggered preemption.
func (a *Allocation) HasTriggeredPreemption() bool {
a.RLock()
defer a.RUnlock()
return a.preemptionTriggered
}
// LessThan compares two allocations by priority and then creation time.
func (a *Allocation) LessThan(other *Allocation) bool {
if a.priority == other.priority {
return a.createTime.After(other.createTime) || a.createTime.Equal(other.createTime)
}
return a.priority < other.priority
}
// SetSchedulingAttempted marks whether scheduling has been attempted at least once for this allocation.
func (a *Allocation) SetSchedulingAttempted(attempted bool) {
a.Lock()
defer a.Unlock()
a.schedulingAttempted = attempted
}
// IsSchedulingAttempted determines whether scheduling has been attempted at least once for this allocation.
func (a *Allocation) IsSchedulingAttempted() bool {
a.RLock()
defer a.RUnlock()
return a.schedulingAttempted
}
// SetScaleUpTriggered marks this allocation as having triggered the autoscaler.
func (a *Allocation) SetScaleUpTriggered(triggered bool) {
a.Lock()
defer a.Unlock()
a.scaleUpTriggered = triggered
}
// HasTriggeredScaleUp determines if this allocation has triggered auto-scaling.
func (a *Allocation) HasTriggeredScaleUp() bool {
a.RLock()
defer a.RUnlock()
return a.scaleUpTriggered
}
func (a *Allocation) setHeadroomCheckFailed(headroom *resources.Resource, queue string) {
a.Lock()
defer a.Unlock()
if !a.headroomCheckFailed {
a.headroomCheckFailed = true
a.askEvents.SendRequestExceedsQueueHeadroom(a.allocationKey, a.applicationID, headroom, a.allocatedResource, queue)
}
}
func (a *Allocation) setHeadroomCheckPassed(queue string) {
a.Lock()
defer a.Unlock()
if a.headroomCheckFailed {
a.headroomCheckFailed = false
a.askEvents.SendRequestFitsInQueue(a.allocationKey, a.applicationID, queue, a.allocatedResource)
}
}
func (a *Allocation) setUserQuotaCheckFailed(available *resources.Resource) {
a.Lock()
defer a.Unlock()
if !a.userQuotaCheckFailed {
a.userQuotaCheckFailed = true
a.askEvents.SendRequestExceedsUserQuota(a.allocationKey, a.applicationID, available, a.allocatedResource)
}
}
func (a *Allocation) setUserQuotaCheckPassed() {
a.Lock()
defer a.Unlock()
if a.userQuotaCheckFailed {
a.userQuotaCheckFailed = false
a.askEvents.SendRequestFitsInUserQuota(a.allocationKey, a.applicationID, a.allocatedResource)
}
}
func (a *Allocation) IsForeign() bool {
return a.foreign
}
func (a *Allocation) IsPreemptable() bool {
return a.preemptable
}