func()

in pkg/resmgr/task/rmtask.go [127:323]


func (rmTask *RMTask) initStateMachine() error {

	stateMachine, err :=
		state.NewBuilder().
			WithName(rmTask.Task().GetTaskId().GetValue()).
			WithCurrentState(state.State(task.TaskState_INITIALIZED.String())).
			WithTransitionCallback(rmTask.transitionCallBack).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_INITIALIZED.String()),
					To: []state.State{
						state.State(task.TaskState_PENDING.String()),
						// We need this transition when we want to place
						// running/launched task back to resmgr
						// as running/launching
						state.State(task.TaskState_RUNNING.String()),
						state.State(task.TaskState_LAUNCHING.String()),
						state.State(task.TaskState_LAUNCHED.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_PENDING.String()),
					To: []state.State{
						state.State(task.TaskState_READY.String()),
						state.State(task.TaskState_KILLED.String()),
						// It may happen that placement engine returns
						// just after resmgr recovery and task is still
						// in pending
						state.State(task.TaskState_PLACED.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_READY.String()),
					To: []state.State{
						state.State(task.TaskState_PLACING.String()),
						state.State(task.TaskState_RESERVED.String()),
						// It may happen that placement engine returns
						// just after resmgr timeout and task is still
						// in ready
						state.State(task.TaskState_PLACED.String()),
						state.State(task.TaskState_KILLED.String()),
						// This transition we need, to put back ready
						// state to pending state for in transitions
						// tasks which could not reach to ready queue
						state.State(task.TaskState_PENDING.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_PLACING.String()),
					To: []state.State{
						state.State(task.TaskState_READY.String()),
						state.State(task.TaskState_PLACED.String()),
						state.State(task.TaskState_KILLED.String()),
						// This transition is required when the task is
						// preempted while its being placed by the Placement
						// engine. If preempted it'll go back to PENDING
						// state and relinquish its resource allocation from
						// the resource pool.
						state.State(task.TaskState_PENDING.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_RESERVED.String()),
					To: []state.State{
						state.State(task.TaskState_PLACED.String()),
						state.State(task.TaskState_KILLED.String()),
						// This transition is required when the task is
						// preempted while its being placed by the Placement
						// engine. If preempted it'll go back to PENDING
						// state and relinquish its resource allocation from
						// the resource pool.
						state.State(task.TaskState_PENDING.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_PLACED.String()),
					To: []state.State{
						state.State(task.TaskState_LAUNCHING.String()),
						state.State(task.TaskState_KILLED.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_LAUNCHING.String()),
					To: []state.State{
						state.State(task.TaskState_RUNNING.String()),
						state.State(task.TaskState_READY.String()),
						state.State(task.TaskState_KILLED.String()),
						state.State(task.TaskState_LAUNCHED.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_LAUNCHED.String()),
					To: []state.State{
						state.State(task.TaskState_RUNNING.String()),
						// The launch of the task may time out in job manager,
						// which will then regenerate the mesos task id, and then
						// enqueue the task again into resource manager. Since, the
						// task has already passed admission control, it will be
						// moved to the READY state.
						state.State(task.TaskState_READY.String()),
						state.State(task.TaskState_KILLED.String()),
						state.State(task.TaskState_LAUNCHED.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_RUNNING.String()),
					To: []state.State{
						state.State(task.TaskState_SUCCEEDED.String()),
						state.State(task.TaskState_LOST.String()),
						state.State(task.TaskState_PREEMPTING.String()),
						state.State(task.TaskState_KILLING.String()),
						state.State(task.TaskState_FAILED.String()),
						state.State(task.TaskState_KILLED.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_FAILED.String()),
					To: []state.State{
						state.State(task.TaskState_READY.String()),
					},
					Callback: nil,
				}).
			AddRule(
				&state.Rule{
					From: state.State(task.TaskState_KILLED.String()),
					To: []state.State{
						state.State(task.TaskState_PENDING.String()),
					},
					Callback: nil,
				}).
			AddTimeoutRule(
				&state.TimeoutRule{
					From: state.State(task.TaskState_PLACING.String()),
					To: []state.State{
						state.State(task.TaskState_READY.String()),
						state.State(task.TaskState_PENDING.String()),
					},
					Timeout:     rmTask.config.PlacingTimeout,
					Callback:    rmTask.timeoutCallbackFromPlacing,
					PreCallback: rmTask.preTimeoutCallback,
				}).
			AddTimeoutRule(
				&state.TimeoutRule{
					From: state.State(task.TaskState_LAUNCHING.String()),
					To: []state.State{
						state.State(task.TaskState_READY.String()),
					},
					Timeout:  rmTask.config.LaunchingTimeout,
					Callback: rmTask.timeoutCallbackFromLaunching,
				}).
			AddTimeoutRule(
				&state.TimeoutRule{
					From: state.State(task.TaskState_RESERVED.String()),
					To: []state.State{
						state.State(task.TaskState_PENDING.String()),
					},
					Timeout:  rmTask.config.ReservingTimeout,
					Callback: rmTask.timeoutCallbackFromReserving,
				}).
			AddTimeoutRule(
				// If the task is dropped by jobmgr then the rmtask should move
				// back to RUNNING state (on timeout) so that it is enqueued in the
				// preemption queue in the next preemption cycle.
				&state.TimeoutRule{
					From: state.State(task.TaskState_PREEMPTING.String()),
					To: []state.State{
						state.State(task.TaskState_RUNNING.String()),
					},
					Timeout:  rmTask.config.PreemptingTimeout,
					Callback: nil,
				}).
			Build()
	if err != nil {
		return err
	}

	rmTask.stateMachine = stateMachine
	return nil
}