func()

in pkg/scheduler/objects/queue.go [797:850]


func (sq *Queue) RemoveApplication(app *Application) {
	// clean up any outstanding pending resources
	appID := app.ApplicationID
	if !sq.appExists(appID) {
		log.Log(log.SchedQueue).Debug("Application not found while removing from queue",
			zap.String("queueName", sq.QueuePath),
			zap.String("applicationID", appID))
		return
	}
	sq.queueEvents.SendRemoveApplicationEvent(sq.QueuePath, appID)
	if appPending := app.GetPendingResource(); !resources.IsZero(appPending) {
		sq.decPendingResource(appPending)
	}
	// clean up the allocated resource
	if appAllocated := app.GetAllocatedResource(); !resources.IsZero(appAllocated) {
		if err := sq.DecAllocatedResource(appAllocated); err != nil {
			log.Log(log.SchedQueue).Warn("failed to release allocated resources from queue",
				zap.String("appID", appID),
				zap.Error(err))
		}
	}
	// clean up the allocated placeholder resource
	if phAllocated := app.GetPlaceholderResource(); !resources.IsZero(phAllocated) {
		if err := sq.DecAllocatedResource(phAllocated); err != nil {
			log.Log(log.SchedQueue).Warn("failed to release placeholder resources from queue",
				zap.String("appID", appID),
				zap.Error(err))
		}
	}
	// clean up preempting resources
	preempting := resources.NewResource()
	for _, alloc := range app.GetAllAllocations() {
		if alloc.IsPreempted() {
			preempting.AddTo(alloc.GetAllocatedResource())
		}
	}
	if !resources.IsZero(preempting) {
		sq.DecPreemptingResource(preempting)
	}

	sq.Lock()
	delete(sq.applications, appID)
	delete(sq.appPriorities, appID)
	delete(sq.allocatingAcceptedApps, appID)
	priority := sq.recalculatePriority()
	sq.Unlock()
	app.appEvents.SendRemoveApplicationEvent(appID)

	sq.parent.UpdateQueuePriority(sq.Name, priority)

	log.Log(log.SchedQueue).Info("Application completed and removed from queue",
		zap.String("queueName", sq.QueuePath),
		zap.String("applicationID", appID))
}