func()

in pkg/scheduler/objects/preemption.go [557:659]


func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
	// validate that sufficient capacity can be freed
	if !p.checkPreemptionQueueGuarantees() {
		p.ask.LogAllocationFailure(common.PreemptionDoesNotGuarantee, true)
		return nil, false
	}

	// ensure required data structures are populated
	p.initWorkingState()

	// try to find a node to schedule on and victims to preempt
	nodeID, victims, ok := p.tryNodes()
	if !ok {
		// no preemption possible
		return nil, false
	}

	// look for additional victims in case we have not yet made enough capacity in the queue
	extraVictims, ok := p.calculateAdditionalVictims(victims)
	if !ok {
		// not enough resources were preempted
		return nil, false
	}
	victims = append(victims, extraVictims...)
	if len(victims) == 0 {
		return nil, false
	}

	// Did victims collected so far fulfill the ask need? In case of any shortfall between the ask resource requirement
	// and total victims resources, preemption won't help even though victims has been collected.

	// Holds total victims resources
	victimsTotalResource := resources.NewResource()

	fitIn := false
	nodeCurrentAvailable := p.nodeAvailableMap
	if nodeCurrentAvailable[nodeID].FitIn(p.ask.GetAllocatedResource()) {
		fitIn = true
	}

	// Since there could be more victims than the actual need, ensure only required victims are filtered finally
	// to do: There is room for improvements especially when there are more victims. victims could be chosen based
	// on different criteria. for example, victims could be picked up either from specific node (bin packing) or
	// from multiple nodes (fair) given the choices.
	var finalVictims []*Allocation
	for _, victim := range victims {
		// Victims from any node is acceptable as long as chosen node has enough space to accommodate the ask
		// Otherwise, preempting victims from 'n' different nodes doesn't help to achieve the goal.
		if !fitIn && victim.GetNodeID() != nodeID {
			continue
		}
		// stop collecting the victims once ask resource requirement met
		if p.ask.GetAllocatedResource().StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
			finalVictims = append(finalVictims, victim)
		}
		// add the victim resources to the total
		victimsTotalResource.AddTo(victim.GetAllocatedResource())
	}

	if p.ask.GetAllocatedResource().StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
		// there is shortfall, so preemption doesn't help
		p.ask.LogAllocationFailure(common.PreemptionShortfall, true)
		return nil, false
	}

	// preempt the victims
	for _, victim := range finalVictims {
		if victimQueue := p.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
			victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
			victim.MarkPreempted()
			log.Log(log.SchedPreemption).Info("Preempting task",
				zap.String("askApplicationID", p.ask.applicationID),
				zap.String("askAllocationKey", p.ask.allocationKey),
				zap.String("askQueue", p.queue.Name),
				zap.String("victimApplicationID", victim.GetApplicationID()),
				zap.String("victimAllocationKey", victim.GetAllocationKey()),
				zap.Stringer("victimAllocatedResource", victim.GetAllocatedResource()),
				zap.String("victimNodeID", victim.GetNodeID()),
				zap.String("victimQueue", victimQueue.Name),
			)
			victim.SendPreemptedBySchedulerEvent(p.ask.allocationKey, p.ask.applicationID, p.application.queuePath)
		} else {
			log.Log(log.SchedPreemption).Warn("BUG: Queue not found for preemption victim",
				zap.String("queue", p.queue.Name),
				zap.String("victimApplicationID", victim.GetApplicationID()),
				zap.String("victimAllocationKey", victim.GetAllocationKey()))
		}
	}

	// mark ask as having triggered preemption so that we don't preempt again
	p.ask.MarkTriggeredPreemption()

	// notify RM that victims should be released
	p.application.notifyRMAllocationReleased(finalVictims, si.TerminationType_PREEMPTED_BY_SCHEDULER,
		"preempting allocations to free up resources to run ask: "+p.ask.GetAllocationKey())

	// reserve the selected node for the new allocation if it will fit
	log.Log(log.SchedPreemption).Info("Reserving node for ask after preemption",
		zap.String("allocationKey", p.ask.GetAllocationKey()),
		zap.String("nodeID", nodeID),
		zap.Int("victimCount", len(victims)))
	return newReservedAllocationResult(nodeID, p.ask), true
}