in pkg/scheduler/objects/application_state.go [143:266]
func callbacks() fsm.Callbacks {
return fsm.Callbacks{
"enter_state": func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
log.Log(log.SchedFSM).Info("Application state transition",
zap.String("appID", app.ApplicationID),
zap.String("source", event.Src),
zap.String("destination", event.Dst),
zap.String("event", event.Event))
eventInfo := ""
if len(event.Args) == 2 {
eventInfo = event.Args[1].(string) //nolint:errcheck
app.OnStateChange(event, eventInfo)
} else {
app.OnStateChange(event, "")
}
eventDetails, ok := stateEvents[event.Dst]
if !ok {
log.Log(log.SchedFSM).Error("event details not found",
zap.String("state", event.Dst))
return
}
if app.sendStateChangeEvents {
app.appEvents.SendStateChangeEvent(app.ApplicationID, eventDetails, eventInfo)
}
},
"leave_state": func(_ context.Context, event *fsm.Event) {
event.Args[0].(*Application).clearStateTimer() //nolint:errcheck
},
fmt.Sprintf("leave_%s", New.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
// only updated queue metrics because scheduler metrics are increased only for submission count
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsNew()
},
fmt.Sprintf("enter_%s", Accepted.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
},
fmt.Sprintf("leave_%s", Accepted.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
// only updated queue metrics because scheduler metrics are increased only for submission count
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsAccepted()
},
fmt.Sprintf("enter_%s", Rejected.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRejected()
metrics.GetSchedulerMetrics().IncTotalApplicationsRejected()
app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication)
app.finishedTime = time.Now()
app.cleanupTrackedResource()
// No rejected message when use app.HandleApplicationEvent(RejectApplication)
if len(event.Args) == 2 {
app.rejectedMessage = event.Args[1].(string) //nolint:errcheck
}
},
fmt.Sprintf("enter_%s", Running.String()): func(_ context.Context, event *fsm.Event) {
if event.Src != Running.String() {
app := event.Args[0].(*Application) //nolint:errcheck
app.startTime = time.Now()
app.queue.incRunningApps(app.ApplicationID)
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
}
},
fmt.Sprintf("leave_%s", Running.String()): func(_ context.Context, event *fsm.Event) {
if event.Dst != Running.String() {
app := event.Args[0].(*Application) //nolint:errcheck
app.queue.decRunningApps()
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
}
},
fmt.Sprintf("enter_%s", Resuming.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsResuming()
metrics.GetSchedulerMetrics().IncTotalApplicationsResuming()
},
fmt.Sprintf("leave_%s", Resuming.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsResuming()
metrics.GetSchedulerMetrics().DecTotalApplicationsResuming()
},
fmt.Sprintf("enter_%s", Failing.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailing()
metrics.GetSchedulerMetrics().IncTotalApplicationsFailing()
},
fmt.Sprintf("leave_%s", Failing.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsFailing()
metrics.GetSchedulerMetrics().DecTotalApplicationsFailing()
},
fmt.Sprintf("enter_%s", Completing.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication)
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsCompleting()
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleting()
},
fmt.Sprintf("leave_%s", Completing.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsCompleting()
metrics.GetSchedulerMetrics().DecTotalApplicationsCompleting()
},
fmt.Sprintf("enter_%s", Completed.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsCompleted()
app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication)
app.executeTerminatedCallback()
app.clearPlaceholderTimer()
app.cleanupAsks()
},
fmt.Sprintf("enter_%s", Failed.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetSchedulerMetrics().IncTotalApplicationsFailed()
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication)
app.executeTerminatedCallback()
app.cleanupAsks()
},
}
}