in pkg/scheduler/objects/application.go [1151:1284]
func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, getNodeFn func(string) *Node) *AllocationResult {
sa.Lock()
defer sa.Unlock()
// nothing to do if we have no placeholders allocated
if resources.IsZero(sa.allocatedPlaceholder) || sa.sortedRequests == nil {
return nil
}
// keep the first fits for later
var phFit *Allocation
var reqFit *Allocation
// get all the requests from the app sorted in order
for _, request := range sa.sortedRequests {
// skip placeholders they follow standard allocation
// this should also be part of a task group just make sure it is
if request.IsPlaceholder() || request.GetTaskGroup() == "" || request.IsAllocated() {
continue
}
// walk over the placeholders, allow for processing all as we can have multiple task groups
phAllocs := sa.getPlaceholderAllocations()
for _, ph := range phAllocs {
// we could have already released preempted this placeholder and are waiting for the shim to confirm
// and check that we have the correct task group before trying to swap
if ph.IsReleased() || ph.IsPreempted() || request.GetTaskGroup() != ph.GetTaskGroup() {
continue
}
// before we check anything we need to check the resources equality
delta := resources.Sub(ph.GetAllocatedResource(), request.GetAllocatedResource())
// Any negative value in the delta means that at least one of the requested resource in the real
// allocation is larger than the placeholder. We need to cancel this placeholder and check the next
// placeholder. This should trigger the removal of all the placeholder that are part of this task group.
// All placeholders in the same task group are always the same size.
if delta.HasNegativeValue() {
log.Log(log.SchedApplication).Warn("releasing placeholder: real allocation is larger than placeholder",
zap.Stringer("requested resource", request.GetAllocatedResource()),
zap.String("placeholderKey", ph.GetAllocationKey()),
zap.Stringer("placeholder resource", ph.GetAllocatedResource()))
// release the placeholder and tell the RM
ph.SetReleased(true)
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource incompatible")
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
continue
}
// placeholder is the same or larger continue processing and difference is handled when the placeholder
// is swapped with the real one.
if phFit == nil && reqFit == nil {
phFit = ph
reqFit = request
}
node := getNodeFn(ph.GetNodeID())
// got the node run same checks as for reservation (all but fits)
// resource usage should not change anyway between placeholder and real one at this point
if node != nil && node.preReserveConditions(request) == nil {
_, err := sa.allocateAsk(request)
if err != nil {
log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly",
zap.Error(err))
}
// double link to make it easier to find
// alloc (the real one) releases points to the placeholder in the releases list
request.SetRelease(ph)
// placeholder point to the real one in the releases list
ph.SetRelease(request)
// mark placeholder as released
ph.SetReleased(true)
// bind node here so it will be handled properly upon replacement
request.SetBindTime(time.Now())
request.SetNodeID(node.NodeID)
request.SetInstanceType(node.GetInstanceType())
return newReplacedAllocationResult(node.NodeID, request)
}
}
}
// cannot allocate if the iterator is not giving us any schedulable nodes
iterator := nodeIterator()
if iterator == nil {
return nil
}
// we checked all placeholders and asks nothing worked as yet
// pick the first fit and try all nodes if that fails give up
var allocResult *AllocationResult
if phFit != nil && reqFit != nil {
resKey := reqFit.GetAllocationKey()
iterator.ForEachNode(func(node *Node) bool {
if !node.IsSchedulable() {
log.Log(log.SchedApplication).Debug("skipping node for placeholder alloc as state is unschedulable",
zap.String("allocationKey", resKey),
zap.String("node", node.NodeID))
return true
}
if !node.preAllocateCheck(reqFit.GetAllocatedResource(), resKey) {
return true
}
// skip the node if conditions can not be satisfied
if err := node.preAllocateConditions(reqFit); err != nil {
return true
}
// update just the node to make sure we keep its spot
// no queue update as we're releasing the placeholder and are just temp over the size
if !node.TryAddAllocation(reqFit) {
log.Log(log.SchedApplication).Debug("Node update failed unexpectedly",
zap.String("applicationID", sa.ApplicationID),
zap.Stringer("alloc", reqFit),
zap.Stringer("placeholder", phFit))
return false
}
_, err := sa.allocateAsk(reqFit)
if err != nil {
log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly",
zap.Error(err))
// unwind node allocation
_ = node.RemoveAllocation(resKey)
return false
}
// allocation worked: on a non placeholder node update resultType and return
// double link to make it easier to find
// alloc (the real one) releases points to the placeholder in the releases list
reqFit.SetRelease(phFit)
// placeholder point to the real one in the releases list
phFit.SetRelease(reqFit)
// mark placeholder as released
phFit.SetReleased(true)
// bind node here so it will be handled properly upon replacement
reqFit.SetBindTime(time.Now())
reqFit.SetNodeID(node.NodeID)
reqFit.SetInstanceType(node.GetInstanceType())
result := newReplacedAllocationResult(node.NodeID, reqFit)
allocResult = result
return false
})
}
// still nothing worked give up and hope the next round works
return allocResult
}