func()

in pkg/scheduler/objects/application.go [1151:1284]


func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, getNodeFn func(string) *Node) *AllocationResult {
	sa.Lock()
	defer sa.Unlock()
	// nothing to do if we have no placeholders allocated
	if resources.IsZero(sa.allocatedPlaceholder) || sa.sortedRequests == nil {
		return nil
	}
	// keep the first fits for later
	var phFit *Allocation
	var reqFit *Allocation
	// get all the requests from the app sorted in order
	for _, request := range sa.sortedRequests {
		// skip placeholders they follow standard allocation
		// this should also be part of a task group just make sure it is
		if request.IsPlaceholder() || request.GetTaskGroup() == "" || request.IsAllocated() {
			continue
		}
		// walk over the placeholders, allow for processing all as we can have multiple task groups
		phAllocs := sa.getPlaceholderAllocations()
		for _, ph := range phAllocs {
			// we could have already released preempted this placeholder and are waiting for the shim to confirm
			// and check that we have the correct task group before trying to swap
			if ph.IsReleased() || ph.IsPreempted() || request.GetTaskGroup() != ph.GetTaskGroup() {
				continue
			}
			// before we check anything we need to check the resources equality
			delta := resources.Sub(ph.GetAllocatedResource(), request.GetAllocatedResource())
			// Any negative value in the delta means that at least one of the requested resource in the real
			// allocation is larger than the placeholder. We need to cancel this placeholder and check the next
			// placeholder. This should trigger the removal of all the placeholder that are part of this task group.
			// All placeholders in the same task group are always the same size.
			if delta.HasNegativeValue() {
				log.Log(log.SchedApplication).Warn("releasing placeholder: real allocation is larger than placeholder",
					zap.Stringer("requested resource", request.GetAllocatedResource()),
					zap.String("placeholderKey", ph.GetAllocationKey()),
					zap.Stringer("placeholder resource", ph.GetAllocatedResource()))
				// release the placeholder and tell the RM
				ph.SetReleased(true)
				sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource incompatible")
				sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
				continue
			}
			// placeholder is the same or larger continue processing and difference is handled when the placeholder
			// is swapped with the real one.
			if phFit == nil && reqFit == nil {
				phFit = ph
				reqFit = request
			}
			node := getNodeFn(ph.GetNodeID())
			// got the node run same checks as for reservation (all but fits)
			// resource usage should not change anyway between placeholder and real one at this point
			if node != nil && node.preReserveConditions(request) == nil {
				_, err := sa.allocateAsk(request)
				if err != nil {
					log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly",
						zap.Error(err))
				}
				// double link to make it easier to find
				// alloc (the real one) releases points to the placeholder in the releases list
				request.SetRelease(ph)
				// placeholder point to the real one in the releases list
				ph.SetRelease(request)
				// mark placeholder as released
				ph.SetReleased(true)
				// bind node here so it will be handled properly upon replacement
				request.SetBindTime(time.Now())
				request.SetNodeID(node.NodeID)
				request.SetInstanceType(node.GetInstanceType())
				return newReplacedAllocationResult(node.NodeID, request)
			}
		}
	}
	// cannot allocate if the iterator is not giving us any schedulable nodes
	iterator := nodeIterator()
	if iterator == nil {
		return nil
	}
	// we checked all placeholders and asks nothing worked as yet
	// pick the first fit and try all nodes if that fails give up
	var allocResult *AllocationResult
	if phFit != nil && reqFit != nil {
		resKey := reqFit.GetAllocationKey()
		iterator.ForEachNode(func(node *Node) bool {
			if !node.IsSchedulable() {
				log.Log(log.SchedApplication).Debug("skipping node for placeholder alloc as state is unschedulable",
					zap.String("allocationKey", resKey),
					zap.String("node", node.NodeID))
				return true
			}
			if !node.preAllocateCheck(reqFit.GetAllocatedResource(), resKey) {
				return true
			}
			// skip the node if conditions can not be satisfied
			if err := node.preAllocateConditions(reqFit); err != nil {
				return true
			}
			// update just the node to make sure we keep its spot
			// no queue update as we're releasing the placeholder and are just temp over the size
			if !node.TryAddAllocation(reqFit) {
				log.Log(log.SchedApplication).Debug("Node update failed unexpectedly",
					zap.String("applicationID", sa.ApplicationID),
					zap.Stringer("alloc", reqFit),
					zap.Stringer("placeholder", phFit))
				return false
			}
			_, err := sa.allocateAsk(reqFit)
			if err != nil {
				log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly",
					zap.Error(err))
				// unwind node allocation
				_ = node.RemoveAllocation(resKey)
				return false
			}
			// allocation worked: on a non placeholder node update resultType and return
			// double link to make it easier to find
			// alloc (the real one) releases points to the placeholder in the releases list
			reqFit.SetRelease(phFit)
			// placeholder point to the real one in the releases list
			phFit.SetRelease(reqFit)
			// mark placeholder as released
			phFit.SetReleased(true)
			// bind node here so it will be handled properly upon replacement
			reqFit.SetBindTime(time.Now())
			reqFit.SetNodeID(node.NodeID)
			reqFit.SetInstanceType(node.GetInstanceType())
			result := newReplacedAllocationResult(node.NodeID, reqFit)

			allocResult = result
			return false
		})
	}
	// still nothing worked give up and hope the next round works
	return allocResult
}