in pkg/scheduler/partition.go [307:391]
func (pc *PartitionContext) AddApplication(app *objects.Application) error {
if pc.isDraining() || pc.isStopped() {
return fmt.Errorf("partition %s is stopped cannot add a new application %s", pc.Name, app.ApplicationID)
}
// Check if the app exists
appID := app.ApplicationID
if pc.getApplication(appID) != nil {
return fmt.Errorf("adding application %s to partition %s, but application already existed", appID, pc.Name)
}
// Resolve the queue for this app using the placement rules
// We either have an error or a queue name is set on the application.
err := pc.getPlacementManager().PlaceApplication(app)
if err != nil {
return fmt.Errorf("failed to place application %s: %v", appID, err)
}
queueName := app.GetQueuePath()
// lock the partition and make the last change: we need to do this before creating the queues.
// queue cleanup might otherwise remove the queue again before we can add the application
pc.Lock()
defer pc.Unlock()
// we have a queue name either from placement or direct, get the queue
queue := pc.getQueueInternal(queueName)
// create the queue if necessary
isRecoveryQueue := common.IsRecoveryQueue(queueName)
if queue == nil {
if isRecoveryQueue {
queue, err = pc.createRecoveryQueue()
if err != nil {
return errors.Join(fmt.Errorf("failed to create recovery queue %s for application %s", common.RecoveryQueueFull, appID), err)
}
} else {
queue, err = pc.createQueue(queueName, app.GetUser())
if err != nil {
return errors.Join(fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID), err)
}
}
}
// check the queue: is a leaf queue
if !queue.IsLeafQueue() {
return fmt.Errorf("failed to find queue %s for application %s", queueName, appID)
}
guaranteedRes := app.GetGuaranteedResource()
maxRes := app.GetMaxResource()
maxApps := app.GetMaxApps()
if !isRecoveryQueue && (guaranteedRes != nil || maxRes != nil || maxApps != 0) {
// set resources based on tags, but only if the queue is dynamic (unmanaged)
if !queue.IsManaged() {
if maxApps != 0 {
queue.SetMaxRunningApps(maxApps)
}
if guaranteedRes != nil || maxRes != nil {
queue.SetResources(guaranteedRes, maxRes)
}
}
}
// check only for gang request
// - make sure the taskgroup request fits in the maximum set for the queue hierarchy
// - task groups should only be used in FIFO queues
// if the check fails remove the app from the queue again
if placeHolder := app.GetPlaceholderAsk(); !resources.IsZero(placeHolder) {
// check the queue sorting
if !queue.SupportTaskGroup() {
return fmt.Errorf("queue %s cannot run application %s with task group request: unsupported sort type", queueName, appID)
}
if maxQueue := queue.GetMaxQueueSet(); maxQueue != nil {
if !maxQueue.FitInMaxUndef(placeHolder) {
return fmt.Errorf("queue %s cannot fit application %s: task group request %s larger than max queue allocation %s", queueName, appID, placeHolder.String(), maxQueue.String())
}
}
}
// all is OK update the app and add it to the partition
app.SetQueue(queue)
app.SetTerminatedCallback(pc.moveTerminatedApp)
queue.AddApplication(app)
pc.applications[appID] = app
return nil
}