in pkg/scheduler/objects/preemption.go [557:659]
func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
// validate that sufficient capacity can be freed
if !p.checkPreemptionQueueGuarantees() {
p.ask.LogAllocationFailure(common.PreemptionDoesNotGuarantee, true)
return nil, false
}
// ensure required data structures are populated
p.initWorkingState()
// try to find a node to schedule on and victims to preempt
nodeID, victims, ok := p.tryNodes()
if !ok {
// no preemption possible
return nil, false
}
// look for additional victims in case we have not yet made enough capacity in the queue
extraVictims, ok := p.calculateAdditionalVictims(victims)
if !ok {
// not enough resources were preempted
return nil, false
}
victims = append(victims, extraVictims...)
if len(victims) == 0 {
return nil, false
}
// Did victims collected so far fulfill the ask need? In case of any shortfall between the ask resource requirement
// and total victims resources, preemption won't help even though victims has been collected.
// Holds total victims resources
victimsTotalResource := resources.NewResource()
fitIn := false
nodeCurrentAvailable := p.nodeAvailableMap
if nodeCurrentAvailable[nodeID].FitIn(p.ask.GetAllocatedResource()) {
fitIn = true
}
// Since there could be more victims than the actual need, ensure only required victims are filtered finally
// to do: There is room for improvements especially when there are more victims. victims could be chosen based
// on different criteria. for example, victims could be picked up either from specific node (bin packing) or
// from multiple nodes (fair) given the choices.
var finalVictims []*Allocation
for _, victim := range victims {
// Victims from any node is acceptable as long as chosen node has enough space to accommodate the ask
// Otherwise, preempting victims from 'n' different nodes doesn't help to achieve the goal.
if !fitIn && victim.GetNodeID() != nodeID {
continue
}
// stop collecting the victims once ask resource requirement met
if p.ask.GetAllocatedResource().StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
finalVictims = append(finalVictims, victim)
}
// add the victim resources to the total
victimsTotalResource.AddTo(victim.GetAllocatedResource())
}
if p.ask.GetAllocatedResource().StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
// there is shortfall, so preemption doesn't help
p.ask.LogAllocationFailure(common.PreemptionShortfall, true)
return nil, false
}
// preempt the victims
for _, victim := range finalVictims {
if victimQueue := p.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
victim.MarkPreempted()
log.Log(log.SchedPreemption).Info("Preempting task",
zap.String("askApplicationID", p.ask.applicationID),
zap.String("askAllocationKey", p.ask.allocationKey),
zap.String("askQueue", p.queue.Name),
zap.String("victimApplicationID", victim.GetApplicationID()),
zap.String("victimAllocationKey", victim.GetAllocationKey()),
zap.Stringer("victimAllocatedResource", victim.GetAllocatedResource()),
zap.String("victimNodeID", victim.GetNodeID()),
zap.String("victimQueue", victimQueue.Name),
)
victim.SendPreemptedBySchedulerEvent(p.ask.allocationKey, p.ask.applicationID, p.application.queuePath)
} else {
log.Log(log.SchedPreemption).Warn("BUG: Queue not found for preemption victim",
zap.String("queue", p.queue.Name),
zap.String("victimApplicationID", victim.GetApplicationID()),
zap.String("victimAllocationKey", victim.GetAllocationKey()))
}
}
// mark ask as having triggered preemption so that we don't preempt again
p.ask.MarkTriggeredPreemption()
// notify RM that victims should be released
p.application.notifyRMAllocationReleased(finalVictims, si.TerminationType_PREEMPTED_BY_SCHEDULER,
"preempting allocations to free up resources to run ask: "+p.ask.GetAllocationKey())
// reserve the selected node for the new allocation if it will fit
log.Log(log.SchedPreemption).Info("Reserving node for ask after preemption",
zap.String("allocationKey", p.ask.GetAllocationKey()),
zap.String("nodeID", nodeID),
zap.Int("victimCount", len(victims)))
return newReservedAllocationResult(nodeID, p.ask), true
}