func callbacks()

in pkg/scheduler/objects/application_state.go [143:266]


func callbacks() fsm.Callbacks {
	return fsm.Callbacks{
		"enter_state": func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			log.Log(log.SchedFSM).Info("Application state transition",
				zap.String("appID", app.ApplicationID),
				zap.String("source", event.Src),
				zap.String("destination", event.Dst),
				zap.String("event", event.Event))

			eventInfo := ""
			if len(event.Args) == 2 {
				eventInfo = event.Args[1].(string) //nolint:errcheck
				app.OnStateChange(event, eventInfo)
			} else {
				app.OnStateChange(event, "")
			}
			eventDetails, ok := stateEvents[event.Dst]
			if !ok {
				log.Log(log.SchedFSM).Error("event details not found",
					zap.String("state", event.Dst))
				return
			}
			if app.sendStateChangeEvents {
				app.appEvents.SendStateChangeEvent(app.ApplicationID, eventDetails, eventInfo)
			}
		},
		"leave_state": func(_ context.Context, event *fsm.Event) {
			event.Args[0].(*Application).clearStateTimer() //nolint:errcheck
		},
		fmt.Sprintf("leave_%s", New.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			// only updated queue metrics because scheduler metrics are increased only for submission count
			metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsNew()
		},
		fmt.Sprintf("enter_%s", Accepted.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
			metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
		},
		fmt.Sprintf("leave_%s", Accepted.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			// only updated queue metrics because scheduler metrics are increased only for submission count
			metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsAccepted()
		},
		fmt.Sprintf("enter_%s", Rejected.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRejected()
			metrics.GetSchedulerMetrics().IncTotalApplicationsRejected()
			app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication)
			app.finishedTime = time.Now()
			app.cleanupTrackedResource()
			// No rejected message when use app.HandleApplicationEvent(RejectApplication)
			if len(event.Args) == 2 {
				app.rejectedMessage = event.Args[1].(string) //nolint:errcheck
			}
		},
		fmt.Sprintf("enter_%s", Running.String()): func(_ context.Context, event *fsm.Event) {
			if event.Src != Running.String() {
				app := event.Args[0].(*Application) //nolint:errcheck
				app.startTime = time.Now()
				app.queue.incRunningApps(app.ApplicationID)
				metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
				metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
			}
		},
		fmt.Sprintf("leave_%s", Running.String()): func(_ context.Context, event *fsm.Event) {
			if event.Dst != Running.String() {
				app := event.Args[0].(*Application) //nolint:errcheck
				app.queue.decRunningApps()
				metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
				metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
			}
		},
		fmt.Sprintf("enter_%s", Resuming.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsResuming()
			metrics.GetSchedulerMetrics().IncTotalApplicationsResuming()
		},
		fmt.Sprintf("leave_%s", Resuming.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsResuming()
			metrics.GetSchedulerMetrics().DecTotalApplicationsResuming()
		},
		fmt.Sprintf("enter_%s", Failing.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailing()
			metrics.GetSchedulerMetrics().IncTotalApplicationsFailing()
		},
		fmt.Sprintf("leave_%s", Failing.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsFailing()
			metrics.GetSchedulerMetrics().DecTotalApplicationsFailing()
		},
		fmt.Sprintf("enter_%s", Completing.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication)
			metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsCompleting()
			metrics.GetSchedulerMetrics().IncTotalApplicationsCompleting()
		},
		fmt.Sprintf("leave_%s", Completing.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsCompleting()
			metrics.GetSchedulerMetrics().DecTotalApplicationsCompleting()
		},
		fmt.Sprintf("enter_%s", Completed.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
			metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsCompleted()
			app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication)
			app.executeTerminatedCallback()
			app.clearPlaceholderTimer()
			app.cleanupAsks()
		},
		fmt.Sprintf("enter_%s", Failed.String()): func(_ context.Context, event *fsm.Event) {
			app := event.Args[0].(*Application) //nolint:errcheck
			metrics.GetSchedulerMetrics().IncTotalApplicationsFailed()
			metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
			app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication)
			app.executeTerminatedCallback()
			app.cleanupAsks()
		},
	}
}