in pkg/scheduler/objects/application.go [517:587]
func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord_ChangeDetail) int {
// shortcut no need to do anything
if len(sa.requests) == 0 {
return 0
}
var deltaPendingResource *resources.Resource = nil
// when allocation key is not specified, cleanup all allocations
var toRelease int
if allocKey == "" {
// cleanup all reservations
for _, reserve := range sa.reservations {
releases := sa.unReserveInternal(reserve)
toRelease += releases
}
// clean up the queue reservation
sa.queue.UnReserve(sa.ApplicationID, toRelease)
// Cleanup total pending resource
deltaPendingResource = sa.pending
sa.pending = resources.NewResource()
for _, ask := range sa.requests {
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource(), detail)
}
sa.requests = make(map[string]*Allocation)
sa.sortedRequests = sortedRequests{}
sa.askMaxPriority = configs.MinPriority
sa.queue.UpdateApplicationPriority(sa.ApplicationID, sa.askMaxPriority)
} else {
// cleanup the reservation for this allocation
if reserve, ok := sa.reservations[allocKey]; ok {
releases := sa.unReserveInternal(reserve)
// clean up the queue reservation
sa.queue.UnReserve(sa.ApplicationID, releases)
toRelease += releases
}
if ask := sa.requests[allocKey]; ask != nil {
if !ask.IsAllocated() {
deltaPendingResource = ask.GetAllocatedResource()
sa.pending = resources.Sub(sa.pending, deltaPendingResource)
sa.pending.Prune()
}
delete(sa.requests, allocKey)
sa.sortedRequests.remove(ask)
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource(), detail)
if priority := ask.GetPriority(); priority >= sa.askMaxPriority {
sa.updateAskMaxPriority()
}
}
}
// clean up the queue pending resources
sa.queue.decPendingResource(deltaPendingResource)
// Check if we need to change state based on the removal:
// 1) if pending is zero (no more asks left)
// 2) if confirmed allocations is zero (no real tasks running)
// Change the state to completing.
// When the resource trackers are zero we should not expect anything to come in later.
hasPlaceHolderAllocations := len(sa.getPlaceholderAllocations()) > 0
if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) && !sa.IsFailing() && !sa.IsCompleting() && !hasPlaceHolderAllocations {
if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
log.Log(log.SchedApplication).Warn("Application state not changed to Completing while updating ask(s)",
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
}
log.Log(log.SchedApplication).Info("ask removed successfully from application",
zap.String("appID", sa.ApplicationID),
zap.String("ask", allocKey),
zap.Stringer("pendingDelta", deltaPendingResource))
return toRelease
}