in pkg/scheduler/objects/preemption.go [359:428]
func (p *Preemptor) checkPreemptionPredicates(predicateChecks []*si.PreemptionPredicatesArgs, victimsByNode map[string][]*Allocation) *predicateCheckResult {
// don't process empty list
if len(predicateChecks) == 0 {
return nil
}
// sort predicate checks by number of expected preempted tasks
sort.SliceStable(predicateChecks, func(i int, j int) bool {
// sort by NodeID if StartIndex are same
if predicateChecks[i].StartIndex == predicateChecks[j].StartIndex {
return predicateChecks[i].NodeID < predicateChecks[j].NodeID
}
return predicateChecks[i].StartIndex < predicateChecks[j].StartIndex
})
// check for RM callback
plugin := plugins.GetResourceManagerCallbackPlugin()
if plugin == nil {
// if a plugin isn't registered, assume checks will succeed and synthesize a resultType
check := predicateChecks[0]
log.Log(log.SchedPreemption).Debug("No RM callback plugin registered, using first selected node for preemption",
zap.String("NodeID", check.NodeID),
zap.String("AllocationKey", check.AllocationKey))
result := &predicateCheckResult{
allocationKey: check.AllocationKey,
nodeID: check.NodeID,
success: true,
index: int(check.StartIndex),
}
result.populateVictims(victimsByNode)
return result
}
// process each batch of checks by sending to the RM
batches := batchPreemptionChecks(predicateChecks, preemptCheckConcurrency)
var bestResult *predicateCheckResult = nil
for _, batch := range batches {
var wg sync.WaitGroup
ch := make(chan *predicateCheckResult, len(batch))
expected := 0
for _, args := range batch {
// add goroutine for checking preemption
wg.Add(1)
expected++
go preemptPredicateCheck(plugin, ch, &wg, args)
}
// wait for completion and close channel
go func() {
wg.Wait()
close(ch)
}()
for result := range ch {
// if resultType is successful, keep track of it
if result.success {
if bestResult == nil {
bestResult = result
} else if result.betterThan(bestResult, p.allocationsByNode) {
bestResult = result
}
}
}
// if the best resultType we have from this batch meets all our criteria, don't run another batch
if bestResult.isSatisfactory(p.allocationsByNode) {
break
}
}
bestResult.populateVictims(victimsByNode)
return bestResult
}