func()

in pkg/scheduler/partition.go [307:391]


func (pc *PartitionContext) AddApplication(app *objects.Application) error {
	if pc.isDraining() || pc.isStopped() {
		return fmt.Errorf("partition %s is stopped cannot add a new application %s", pc.Name, app.ApplicationID)
	}

	// Check if the app exists
	appID := app.ApplicationID
	if pc.getApplication(appID) != nil {
		return fmt.Errorf("adding application %s to partition %s, but application already existed", appID, pc.Name)
	}

	// Resolve the queue for this app using the placement rules
	// We either have an error or a queue name is set on the application.
	err := pc.getPlacementManager().PlaceApplication(app)
	if err != nil {
		return fmt.Errorf("failed to place application %s: %v", appID, err)
	}
	queueName := app.GetQueuePath()

	// lock the partition and make the last change: we need to do this before creating the queues.
	// queue cleanup might otherwise remove the queue again before we can add the application
	pc.Lock()
	defer pc.Unlock()
	// we have a queue name either from placement or direct, get the queue
	queue := pc.getQueueInternal(queueName)

	// create the queue if necessary
	isRecoveryQueue := common.IsRecoveryQueue(queueName)
	if queue == nil {
		if isRecoveryQueue {
			queue, err = pc.createRecoveryQueue()
			if err != nil {
				return errors.Join(fmt.Errorf("failed to create recovery queue %s for application %s", common.RecoveryQueueFull, appID), err)
			}
		} else {
			queue, err = pc.createQueue(queueName, app.GetUser())
			if err != nil {
				return errors.Join(fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID), err)
			}
		}
	}

	// check the queue: is a leaf queue
	if !queue.IsLeafQueue() {
		return fmt.Errorf("failed to find queue %s for application %s", queueName, appID)
	}

	guaranteedRes := app.GetGuaranteedResource()
	maxRes := app.GetMaxResource()
	maxApps := app.GetMaxApps()
	if !isRecoveryQueue && (guaranteedRes != nil || maxRes != nil || maxApps != 0) {
		// set resources based on tags, but only if the queue is dynamic (unmanaged)
		if !queue.IsManaged() {
			if maxApps != 0 {
				queue.SetMaxRunningApps(maxApps)
			}
			if guaranteedRes != nil || maxRes != nil {
				queue.SetResources(guaranteedRes, maxRes)
			}
		}
	}

	// check only for gang request
	// - make sure the taskgroup request fits in the maximum set for the queue hierarchy
	// - task groups should only be used in FIFO queues
	// if the check fails remove the app from the queue again
	if placeHolder := app.GetPlaceholderAsk(); !resources.IsZero(placeHolder) {
		// check the queue sorting
		if !queue.SupportTaskGroup() {
			return fmt.Errorf("queue %s cannot run application %s with task group request: unsupported sort type", queueName, appID)
		}
		if maxQueue := queue.GetMaxQueueSet(); maxQueue != nil {
			if !maxQueue.FitInMaxUndef(placeHolder) {
				return fmt.Errorf("queue %s cannot fit application %s: task group request %s larger than max queue allocation %s", queueName, appID, placeHolder.String(), maxQueue.String())
			}
		}
	}
	// all is OK update the app and add it to the partition
	app.SetQueue(queue)
	app.SetTerminatedCallback(pc.moveTerminatedApp)
	queue.AddApplication(app)
	pc.applications[appID] = app

	return nil
}