in pkg/resmgr/task/rmtask.go [127:323]
func (rmTask *RMTask) initStateMachine() error {
stateMachine, err :=
state.NewBuilder().
WithName(rmTask.Task().GetTaskId().GetValue()).
WithCurrentState(state.State(task.TaskState_INITIALIZED.String())).
WithTransitionCallback(rmTask.transitionCallBack).
AddRule(
&state.Rule{
From: state.State(task.TaskState_INITIALIZED.String()),
To: []state.State{
state.State(task.TaskState_PENDING.String()),
// We need this transition when we want to place
// running/launched task back to resmgr
// as running/launching
state.State(task.TaskState_RUNNING.String()),
state.State(task.TaskState_LAUNCHING.String()),
state.State(task.TaskState_LAUNCHED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_PENDING.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_KILLED.String()),
// It may happen that placement engine returns
// just after resmgr recovery and task is still
// in pending
state.State(task.TaskState_PLACED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_READY.String()),
To: []state.State{
state.State(task.TaskState_PLACING.String()),
state.State(task.TaskState_RESERVED.String()),
// It may happen that placement engine returns
// just after resmgr timeout and task is still
// in ready
state.State(task.TaskState_PLACED.String()),
state.State(task.TaskState_KILLED.String()),
// This transition we need, to put back ready
// state to pending state for in transitions
// tasks which could not reach to ready queue
state.State(task.TaskState_PENDING.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_PLACING.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_PLACED.String()),
state.State(task.TaskState_KILLED.String()),
// This transition is required when the task is
// preempted while its being placed by the Placement
// engine. If preempted it'll go back to PENDING
// state and relinquish its resource allocation from
// the resource pool.
state.State(task.TaskState_PENDING.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_RESERVED.String()),
To: []state.State{
state.State(task.TaskState_PLACED.String()),
state.State(task.TaskState_KILLED.String()),
// This transition is required when the task is
// preempted while its being placed by the Placement
// engine. If preempted it'll go back to PENDING
// state and relinquish its resource allocation from
// the resource pool.
state.State(task.TaskState_PENDING.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_PLACED.String()),
To: []state.State{
state.State(task.TaskState_LAUNCHING.String()),
state.State(task.TaskState_KILLED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_LAUNCHING.String()),
To: []state.State{
state.State(task.TaskState_RUNNING.String()),
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_KILLED.String()),
state.State(task.TaskState_LAUNCHED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_LAUNCHED.String()),
To: []state.State{
state.State(task.TaskState_RUNNING.String()),
// The launch of the task may time out in job manager,
// which will then regenerate the mesos task id, and then
// enqueue the task again into resource manager. Since, the
// task has already passed admission control, it will be
// moved to the READY state.
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_KILLED.String()),
state.State(task.TaskState_LAUNCHED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_RUNNING.String()),
To: []state.State{
state.State(task.TaskState_SUCCEEDED.String()),
state.State(task.TaskState_LOST.String()),
state.State(task.TaskState_PREEMPTING.String()),
state.State(task.TaskState_KILLING.String()),
state.State(task.TaskState_FAILED.String()),
state.State(task.TaskState_KILLED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_FAILED.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_KILLED.String()),
To: []state.State{
state.State(task.TaskState_PENDING.String()),
},
Callback: nil,
}).
AddTimeoutRule(
&state.TimeoutRule{
From: state.State(task.TaskState_PLACING.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_PENDING.String()),
},
Timeout: rmTask.config.PlacingTimeout,
Callback: rmTask.timeoutCallbackFromPlacing,
PreCallback: rmTask.preTimeoutCallback,
}).
AddTimeoutRule(
&state.TimeoutRule{
From: state.State(task.TaskState_LAUNCHING.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
},
Timeout: rmTask.config.LaunchingTimeout,
Callback: rmTask.timeoutCallbackFromLaunching,
}).
AddTimeoutRule(
&state.TimeoutRule{
From: state.State(task.TaskState_RESERVED.String()),
To: []state.State{
state.State(task.TaskState_PENDING.String()),
},
Timeout: rmTask.config.ReservingTimeout,
Callback: rmTask.timeoutCallbackFromReserving,
}).
AddTimeoutRule(
// If the task is dropped by jobmgr then the rmtask should move
// back to RUNNING state (on timeout) so that it is enqueued in the
// preemption queue in the next preemption cycle.
&state.TimeoutRule{
From: state.State(task.TaskState_PREEMPTING.String()),
To: []state.State{
state.State(task.TaskState_RUNNING.String()),
},
Timeout: rmTask.config.PreemptingTimeout,
Callback: nil,
}).
Build()
if err != nil {
return err
}
rmTask.stateMachine = stateMachine
return nil
}