in pkg/scheduler/objects/application.go [386:447]
func (sa *Application) timeoutPlaceholderProcessing() {
sa.Lock()
defer sa.Unlock()
if (sa.IsRunning() || sa.IsCompleting()) && !resources.IsZero(sa.allocatedPlaceholder) {
// Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders
var toRelease []*Allocation
replacing := 0
for _, alloc := range sa.getPlaceholderAllocations() {
// skip over the allocations that are already marked for release, they will be replaced soon
if alloc.IsReleased() {
replacing++
continue
}
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
}
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("placeholders being replaced", replacing),
zap.Int("releasing placeholders", len(toRelease)))
// trigger the release of the placeholders: accounting updates when the release is done
sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
} else {
// Case 2: in every other case progress the application, and notify the context about the expired placeholders
// change the status of the app based on gang style: soft resume normal allocations, hard fail the app
event := ResumeApplication
if sa.gangSchedulingStyle == Hard {
event = FailApplication
}
if err := sa.HandleApplicationEventWithInfo(event, "ResourceReservationTimeout"); err != nil {
log.Log(log.SchedApplication).Debug("Application state change failed when placeholder timed out",
zap.String("AppID", sa.ApplicationID),
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
// all allocations are placeholders release them all
var toRelease, pendingRelease []*Allocation
for _, alloc := range sa.allocations {
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
}
// get all open requests and remove them all filter out already allocated as they are already released
for _, alloc := range sa.requests {
if !alloc.IsAllocated() {
alloc.SetReleased(true)
pendingRelease = append(pendingRelease, alloc)
sa.placeholderData[alloc.taskGroupName].TimedOut++
}
}
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated and pending placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("releasing placeholders", len(toRelease)),
zap.Int("pending placeholders", len(pendingRelease)),
zap.String("gang scheduling style", sa.gangSchedulingStyle))
sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
// trigger the release of the allocated placeholders: accounting updates when the release is done
sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
// trigger the release of the pending placeholders: accounting has been done
sa.notifyRMAllocationReleased(pendingRelease, si.TerminationType_TIMEOUT, "releasing pending placeholders on placeholder timeout")
}
sa.clearPlaceholderTimer()
}