func()

in pkg/scheduler/objects/preemption.go [359:428]


func (p *Preemptor) checkPreemptionPredicates(predicateChecks []*si.PreemptionPredicatesArgs, victimsByNode map[string][]*Allocation) *predicateCheckResult {
	// don't process empty list
	if len(predicateChecks) == 0 {
		return nil
	}

	// sort predicate checks by number of expected preempted tasks
	sort.SliceStable(predicateChecks, func(i int, j int) bool {
		// sort by NodeID if StartIndex are same
		if predicateChecks[i].StartIndex == predicateChecks[j].StartIndex {
			return predicateChecks[i].NodeID < predicateChecks[j].NodeID
		}
		return predicateChecks[i].StartIndex < predicateChecks[j].StartIndex
	})

	// check for RM callback
	plugin := plugins.GetResourceManagerCallbackPlugin()
	if plugin == nil {
		// if a plugin isn't registered, assume checks will succeed and synthesize a resultType
		check := predicateChecks[0]
		log.Log(log.SchedPreemption).Debug("No RM callback plugin registered, using first selected node for preemption",
			zap.String("NodeID", check.NodeID),
			zap.String("AllocationKey", check.AllocationKey))

		result := &predicateCheckResult{
			allocationKey: check.AllocationKey,
			nodeID:        check.NodeID,
			success:       true,
			index:         int(check.StartIndex),
		}
		result.populateVictims(victimsByNode)
		return result
	}

	// process each batch of checks by sending to the RM
	batches := batchPreemptionChecks(predicateChecks, preemptCheckConcurrency)
	var bestResult *predicateCheckResult = nil
	for _, batch := range batches {
		var wg sync.WaitGroup
		ch := make(chan *predicateCheckResult, len(batch))
		expected := 0
		for _, args := range batch {
			// add goroutine for checking preemption
			wg.Add(1)
			expected++
			go preemptPredicateCheck(plugin, ch, &wg, args)
		}
		// wait for completion and close channel
		go func() {
			wg.Wait()
			close(ch)
		}()
		for result := range ch {
			// if resultType is successful, keep track of it
			if result.success {
				if bestResult == nil {
					bestResult = result
				} else if result.betterThan(bestResult, p.allocationsByNode) {
					bestResult = result
				}
			}
		}
		// if the best resultType we have from this batch meets all our criteria, don't run another batch
		if bestResult.isSatisfactory(p.allocationsByNode) {
			break
		}
	}
	bestResult.populateVictims(victimsByNode)
	return bestResult
}