pkg/resmgr/task/rmtask.go (527 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"strings"
"sync"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/.gen/peloton/private/resmgr"
"github.com/uber/peloton/.gen/peloton/private/resmgrsvc"
"github.com/uber/peloton/pkg/common/eventstream"
state "github.com/uber/peloton/pkg/common/statemachine"
"github.com/uber/peloton/pkg/resmgr/respool"
"github.com/uber/peloton/pkg/resmgr/scalar"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
)
var (
errTaskNotPresent = errors.New("task is not present in the tracker")
errUnplacedTaskInWrongState = errors.New("unplaced task should be in state placing")
errTaskNotInCorrectState = errors.New("task is not present in correct state")
errTaskNotTransitioned = errors.New("task is not transitioned to state")
)
var (
reasonPlacementFailed = "Reached placement failure backoff threshold"
reasonPlacementRetry = "Previous placement failed"
)
// RunTimeStats is the container for run time stats of the resmgr task
type RunTimeStats struct {
StartTime time.Time
}
// RMTaskState represents the state of the rm task
type RMTaskState struct {
// The state the task is in
State task.TaskState
// The reason for being in the state
Reason string
// Last time the state was updated
LastUpdateTime time.Time
}
// RMTask is the wrapper around resmgr.task for state machine
type RMTask struct {
mu sync.Mutex // Mutex for synchronization
task *resmgr.Task // resmgr task
stateMachine state.StateMachine // state machine for the task
respool respool.ResPool // ResPool in which this tasks belongs to
statusUpdateHandler *eventstream.Handler // Event handler for updates
config *Config // resmgr config object
policy Policy // placement retry backoff policy
runTimeStats *RunTimeStats // run time stats for resmgr task
// observes the state transitions of the rm task
transitionObserver TransitionObserver
}
// CreateRMTask creates the RM task from resmgr.task
func CreateRMTask(
scope tally.Scope,
t *resmgr.Task,
handler *eventstream.Handler,
respool respool.ResPool,
taskConfig *Config) (*RMTask, error) {
r := &RMTask{
task: t,
statusUpdateHandler: handler,
respool: respool,
config: taskConfig,
runTimeStats: &RunTimeStats{
StartTime: time.Time{},
},
transitionObserver: NewTransitionObserver(
taskConfig.EnableSLATracking,
scope,
respool.GetPath(),
),
}
err := r.initStateMachine()
if err != nil {
return nil, err
}
// Placement timeout should be equal to placing timeout by default
r.Task().PlacementTimeoutSeconds = taskConfig.PlacingTimeout.Seconds()
// Checking if placement backoff is enabled
if !taskConfig.EnablePlacementBackoff {
return r, nil
}
// Creating the backoff policy specified in taskConfig
// and will be used for further backoff calculations.
r.policy, err = GetFactory().CreateBackOffPolicy(taskConfig)
if err != nil {
return nil, err
}
return r, nil
}
// initStateMachine initializes the resource manager task state machine
func (rmTask *RMTask) initStateMachine() error {
stateMachine, err :=
state.NewBuilder().
WithName(rmTask.Task().GetTaskId().GetValue()).
WithCurrentState(state.State(task.TaskState_INITIALIZED.String())).
WithTransitionCallback(rmTask.transitionCallBack).
AddRule(
&state.Rule{
From: state.State(task.TaskState_INITIALIZED.String()),
To: []state.State{
state.State(task.TaskState_PENDING.String()),
// We need this transition when we want to place
// running/launched task back to resmgr
// as running/launching
state.State(task.TaskState_RUNNING.String()),
state.State(task.TaskState_LAUNCHING.String()),
state.State(task.TaskState_LAUNCHED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_PENDING.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_KILLED.String()),
// It may happen that placement engine returns
// just after resmgr recovery and task is still
// in pending
state.State(task.TaskState_PLACED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_READY.String()),
To: []state.State{
state.State(task.TaskState_PLACING.String()),
state.State(task.TaskState_RESERVED.String()),
// It may happen that placement engine returns
// just after resmgr timeout and task is still
// in ready
state.State(task.TaskState_PLACED.String()),
state.State(task.TaskState_KILLED.String()),
// This transition we need, to put back ready
// state to pending state for in transitions
// tasks which could not reach to ready queue
state.State(task.TaskState_PENDING.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_PLACING.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_PLACED.String()),
state.State(task.TaskState_KILLED.String()),
// This transition is required when the task is
// preempted while its being placed by the Placement
// engine. If preempted it'll go back to PENDING
// state and relinquish its resource allocation from
// the resource pool.
state.State(task.TaskState_PENDING.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_RESERVED.String()),
To: []state.State{
state.State(task.TaskState_PLACED.String()),
state.State(task.TaskState_KILLED.String()),
// This transition is required when the task is
// preempted while its being placed by the Placement
// engine. If preempted it'll go back to PENDING
// state and relinquish its resource allocation from
// the resource pool.
state.State(task.TaskState_PENDING.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_PLACED.String()),
To: []state.State{
state.State(task.TaskState_LAUNCHING.String()),
state.State(task.TaskState_KILLED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_LAUNCHING.String()),
To: []state.State{
state.State(task.TaskState_RUNNING.String()),
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_KILLED.String()),
state.State(task.TaskState_LAUNCHED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_LAUNCHED.String()),
To: []state.State{
state.State(task.TaskState_RUNNING.String()),
// The launch of the task may time out in job manager,
// which will then regenerate the mesos task id, and then
// enqueue the task again into resource manager. Since, the
// task has already passed admission control, it will be
// moved to the READY state.
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_KILLED.String()),
state.State(task.TaskState_LAUNCHED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_RUNNING.String()),
To: []state.State{
state.State(task.TaskState_SUCCEEDED.String()),
state.State(task.TaskState_LOST.String()),
state.State(task.TaskState_PREEMPTING.String()),
state.State(task.TaskState_KILLING.String()),
state.State(task.TaskState_FAILED.String()),
state.State(task.TaskState_KILLED.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_FAILED.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
},
Callback: nil,
}).
AddRule(
&state.Rule{
From: state.State(task.TaskState_KILLED.String()),
To: []state.State{
state.State(task.TaskState_PENDING.String()),
},
Callback: nil,
}).
AddTimeoutRule(
&state.TimeoutRule{
From: state.State(task.TaskState_PLACING.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
state.State(task.TaskState_PENDING.String()),
},
Timeout: rmTask.config.PlacingTimeout,
Callback: rmTask.timeoutCallbackFromPlacing,
PreCallback: rmTask.preTimeoutCallback,
}).
AddTimeoutRule(
&state.TimeoutRule{
From: state.State(task.TaskState_LAUNCHING.String()),
To: []state.State{
state.State(task.TaskState_READY.String()),
},
Timeout: rmTask.config.LaunchingTimeout,
Callback: rmTask.timeoutCallbackFromLaunching,
}).
AddTimeoutRule(
&state.TimeoutRule{
From: state.State(task.TaskState_RESERVED.String()),
To: []state.State{
state.State(task.TaskState_PENDING.String()),
},
Timeout: rmTask.config.ReservingTimeout,
Callback: rmTask.timeoutCallbackFromReserving,
}).
AddTimeoutRule(
// If the task is dropped by jobmgr then the rmtask should move
// back to RUNNING state (on timeout) so that it is enqueued in the
// preemption queue in the next preemption cycle.
&state.TimeoutRule{
From: state.State(task.TaskState_PREEMPTING.String()),
To: []state.State{
state.State(task.TaskState_RUNNING.String()),
},
Timeout: rmTask.config.PreemptingTimeout,
Callback: nil,
}).
Build()
if err != nil {
return err
}
rmTask.stateMachine = stateMachine
return nil
}
// TransitFromTo transits a task from a source to target state.
// If the from state doesn't match the current state and error is returned.
func (rmTask *RMTask) TransitFromTo(
stateFrom string,
stateTo string,
options ...state.Option) error {
rmTask.mu.Lock()
defer rmTask.mu.Unlock()
if rmTask.getCurrentState().State.String() != stateFrom {
return errTaskNotInCorrectState
}
if err := rmTask.TransitTo(
stateTo,
options...); err != nil {
return errTaskNotTransitioned
}
return nil
}
// TransitTo transitions to the target state
func (rmTask *RMTask) TransitTo(stateTo string, options ...state.Option) error {
fromState := rmTask.getCurrentState().State
err := rmTask.stateMachine.TransitTo(state.State(stateTo), options...)
if err != nil {
if err == state.ErrNoOpTransition {
// the task is already in the desired state
return nil
}
return errors.Wrap(err, "failed to transition rmtask")
}
GetTracker().UpdateMetrics(
fromState,
task.TaskState(task.TaskState_value[stateTo]),
scalar.ConvertToResmgrResource(rmTask.task.GetResource()),
)
return nil
}
// Terminate the rm task
func (rmTask *RMTask) Terminate() {
rmTask.mu.Lock()
defer rmTask.mu.Unlock()
rmTask.stateMachine.Terminate()
}
// Task returns the task of the RMTask.
func (rmTask *RMTask) Task() *resmgr.Task {
return rmTask.task
}
// GetCurrentState returns the current state
func (rmTask *RMTask) GetCurrentState() RMTaskState {
rmTask.mu.Lock()
defer rmTask.mu.Unlock()
return rmTask.getCurrentState()
}
func (rmTask *RMTask) getCurrentState() RMTaskState {
return RMTaskState{
State: task.TaskState(
task.TaskState_value[string(
rmTask.stateMachine.GetCurrentState())]),
Reason: rmTask.stateMachine.GetReason(),
LastUpdateTime: rmTask.stateMachine.GetLastUpdateTime(),
}
}
// Respool returns the respool of the RMTask.
func (rmTask *RMTask) Respool() respool.ResPool {
return rmTask.respool
}
// RunTimeStats returns the runtime stats of the RMTask
func (rmTask *RMTask) RunTimeStats() *RunTimeStats {
return rmTask.runTimeStats
}
// UpdateStartTime updates the start time of the RMTask
func (rmTask *RMTask) UpdateStartTime(startTime time.Time) {
rmTask.runTimeStats.StartTime = startTime
}
// AddBackoff adds the backoff to the RMtask based on backoff policy
func (rmTask *RMTask) AddBackoff() error {
rmTask.mu.Lock()
defer rmTask.mu.Unlock()
// Check if policy is nil then we should return back
if rmTask.policy == nil {
return errors.Errorf("backoff policy is disabled %s", rmTask.Task().Id.GetValue())
}
// Adding the placement timeout values based on policy
rmTask.Task().PlacementTimeoutSeconds = rmTask.config.PlacingTimeout.Seconds() +
rmTask.policy.GetNextBackoffDuration(rmTask.Task(), rmTask.config)
// Adding the placement attempt/retry count based on backoff policy
if rmTask.Task().PlacementAttemptCount < rmTask.config.PlacementAttemptsPerCycle {
rmTask.Task().PlacementAttemptCount++
} else {
rmTask.Task().PlacementAttemptCount = 1
rmTask.Task().PlacementRetryCount++
}
// If there is no Timeout rule for PLACING state we should error out
rule, ok := rmTask.stateMachine.GetTimeOutRules()[state.State(task.TaskState_PLACING.String())]
if !ok {
return errors.Errorf("could not add backoff to task %s", rmTask.Task().Id.GetValue())
}
// Updating the timeout rule by that next timer will start with the new time out value.
rule.Timeout = time.Duration(rmTask.Task().PlacementTimeoutSeconds) * time.Second
log.WithFields(log.Fields{
"task_id": rmTask.Task().Id.Value,
"retry_count": rmTask.Task().PlacementRetryCount,
"attempt_count": rmTask.Task().PlacementAttemptCount,
"placement_timeout": rmTask.Task().PlacementTimeoutSeconds,
}).Info("Adding backoff to task")
return nil
}
// RequeueUnPlaced Requeues the task which couldn't be placed.
func (rmTask *RMTask) RequeueUnPlaced(reason string) error {
rmTask.mu.Lock()
defer rmTask.mu.Unlock()
cState := rmTask.getCurrentState().State
// If the task is in READY/PENDING state we don't need to do anything.
// This can happen if the state machine recovered from PLACING state
if cState == task.TaskState_READY || cState == task.TaskState_PENDING {
return nil
}
// If task is not in PLACING state, it should return error
if cState != task.TaskState_PLACING {
return errUnplacedTaskInWrongState
}
// If task is in PLACING state we need to determine which STATE it will
// transition to based on retry attempts
if rmTask.hasFinishedPlacementCycle() {
// If this task is been failed enough times
// put this task to PENDING queue.
return rmTask.requeueToPendingQueue(reason)
}
// requeue to ready queue
return rmTask.requeueToReadyQueue(reason)
}
// requeques a placing task to ready queue
// NB: Acquire lock on rm task before calling
func (rmTask *RMTask) requeueToReadyQueue(reason string) error {
// move from PLACING to READY with the reason
if err := rmTask.TransitTo(task.TaskState_READY.String(),
state.WithReason(strings.Join(
[]string{
reasonPlacementRetry, reason,
}, ":"))); err != nil {
return err
}
// Enqueue back to ready Queue
if err := rmTask.pushTaskForPlacementAgain(); err != nil {
return err
}
log.WithFields(log.Fields{
"task_id": rmTask.Task().Id.Value,
"from_state": task.TaskState_PLACING.String(),
"to_state": task.TaskState_READY.String(),
}).Info("Task moved back to ready queue")
return nil
}
// requeques a placing task to pending queue
// NB: Acquire lock on rm task before calling
func (rmTask *RMTask) requeueToPendingQueue(reason string) error {
// Transitioning task state to PENDING with the reason
if err := rmTask.TransitTo(
task.TaskState_PENDING.String(),
state.WithReason(strings.Join(
[]string{
reasonPlacementFailed, reason,
}, ":"))); err != nil {
return err
}
// Pushing task to PENDING queue
if err := rmTask.pushTaskForReadmission(); err != nil {
return err
}
log.WithFields(log.Fields{
"task_id": rmTask.Task().Id.Value,
"from_state": task.TaskState_PLACING.String(),
"to_state": task.TaskState_PENDING.String(),
}).Info("Task is pushed back to pending queue from placement engine requeue")
return nil
}
// hasFinishedPlacementCycle returns true if one placement cycle is completed
// otherwise false
// NB: Acquire lock before calling
func (rmTask *RMTask) hasFinishedPlacementCycle() bool {
// Checking if placement backoff is enabled
if !rmTask.config.EnablePlacementBackoff {
return false
}
return rmTask.policy.IsCycleCompleted(rmTask.Task(), rmTask.config)
}
// Returns true if all the placement cycles are completed
// otherwise false. The task is ready for host reservation when
// all the placement cycles have been exhausted
// NB: Acquire lock before calling
func (rmTask *RMTask) hasFinishedAllPlacementCycles() bool {
// Checking if placement backoff is enabled
if !rmTask.config.EnablePlacementBackoff {
return false
}
return rmTask.policy.allCyclesCompleted(rmTask.Task(), rmTask.config)
}
// pushTaskForReadmission pushes the pending task for readmission to pending
// queue
// NB: Acquire lock on rm task before calling
func (rmTask *RMTask) pushTaskForReadmission() error {
var tasks []*resmgr.Task
gang := &resmgrsvc.Gang{
Tasks: append(tasks, rmTask.task),
}
// push to pending queue and add demand
if err := rmTask.Respool().EnqueueGang(gang); err != nil {
return errors.Wrapf(err, "failed to enqueue gang")
}
// remove allocation
if err := rmTask.Respool().SubtractFromAllocation(
scalar.GetGangAllocation(gang)); err != nil {
return errors.Wrapf(err, "failed to remove allocation from respool")
}
return nil
}
// pushTaskForPlacementAgain pushes the task to ready queue as the
// placement cycle is not completed for this task or task is ready
// for host reservation
// NB: Acquire lock on rm task before calling
func (rmTask *RMTask) pushTaskForPlacementAgain() error {
var tasks []*resmgr.Task
gang := &resmgrsvc.Gang{
Tasks: append(tasks, rmTask.task),
}
rmTask.task.Hostname = ""
err := GetScheduler().EnqueueGang(gang)
if err != nil {
return errors.Wrapf(err, "failed to enqueue gang")
}
return nil
}
// transitionCallBack is the global callback for the resource manager task
func (rmTask *RMTask) transitionCallBack(t *state.Transition) error {
if rmTask == nil {
return errTaskNotPresent
}
tState := task.TaskState(task.TaskState_value[string(t.To)])
rmTask.transitionObserver.Observe(
rmTask.Task().GetTaskId().GetValue(),
tState)
// we only care about running state here
if tState == task.TaskState_RUNNING {
// update the start time
rmTask.UpdateStartTime(time.Now().UTC())
}
return nil
}
// timeoutCallback is the callback for the resource manager task
// which moving after timeout from placing/launching state to ready state
func (rmTask *RMTask) timeoutCallbackFromPlacing(t *state.Transition) error {
if rmTask == nil {
return errTaskNotPresent
}
if t.To == state.State(task.TaskState_PENDING.String()) {
log.WithFields(log.Fields{
"task_id": rmTask.Task().GetTaskId().Value,
"from_state": t.From,
"to_state": t.To,
}).Info("Task is pushed back to pending queue")
// we need to push it if pending
err := rmTask.pushTaskForReadmission()
if err != nil {
return err
}
return nil
}
// the task is ready for host reservation
if rmTask.task.ReadyForHostReservation {
log.WithFields(log.Fields{
"task_id": rmTask.Task().GetTaskId().GetValue(),
"from_state": t.From,
"to_state": t.To,
}).Info("Task is pushed back to ready queue for host reservation")
// We need to push it to ready queue to get host reserved in placement
err := rmTask.pushTaskForPlacementAgain()
if err != nil {
return err
}
return nil
}
log.WithFields(log.Fields{
"task_id": rmTask.Task().GetTaskId().GetValue(),
"from_state": t.From,
"to_state": t.To,
}).Info("Task is pushed back to ready queue")
err := rmTask.pushTaskForPlacementAgain()
if err != nil {
return err
}
log.WithField("task_id", rmTask.Task().GetTaskId().GetValue()).
Debug("Enqueue again due to timeout")
return nil
}
func (rmTask *RMTask) resetHostReservation() {
rmTask.task.PlacementRetryCount = 0
rmTask.task.PlacementAttemptCount = 0
rmTask.task.ReadyForHostReservation = false
}
// timeoutCallback is the callback for the resource manager task
// which moving after timeout from reserved state to pending state
func (rmTask *RMTask) timeoutCallbackFromReserving(t *state.Transition) error {
if rmTask == nil {
return errTaskNotPresent
}
rmTask.resetHostReservation()
err := rmTask.pushTaskForReadmission()
if err != nil {
return err
}
log.WithFields(log.Fields{
"task_id": rmTask.Task().GetTaskId().Value,
"from_state": t.From,
"to_state": t.To,
}).Info("Task is pushed back to pending queue")
return nil
}
func (rmTask *RMTask) timeoutCallbackFromLaunching(t *state.Transition) error {
if rmTask == nil {
return errTaskNotPresent
}
err := rmTask.pushTaskForPlacementAgain()
if err != nil {
return err
}
log.WithField("task_id", rmTask.Task().GetTaskId().GetValue()).
Debug("Enqueue again due to timeout")
return nil
}
func (rmTask *RMTask) preTimeoutCallback(t *state.Transition) error {
if rmTask == nil {
return errTaskNotPresent
}
if rmTask.config.EnableHostReservation && rmTask.hasFinishedAllPlacementCycles() {
rmTask.task.ReadyForHostReservation = true
t.To = state.State(task.TaskState_READY.String())
return nil
}
if rmTask.hasFinishedPlacementCycle() {
t.To = state.State(task.TaskState_PENDING.String())
return nil
}
t.To = state.State(task.TaskState_READY.String())
return nil
}
// TODO : Commenting it for now to not publish yet, Until we have solution for
// event race : T936171
// updateStatus creates and send the task event to event stream
//func (rmTask *RMTask) updateStatus(status string) {
//
//
//
//
// //t := time.Now()
// //// Create Peloton task event
// //taskEvent := &task.TaskEvent{
// // Source: task.TaskEvent_SOURCE_RESMGR,
// // State: task.TaskState(task.TaskState_value[status]),
// // TaskId: rmTask.task.Id,
// // Timestamp: t.Format(time.RFC3339),
// //}
// //
// //event := &pb_eventstream.Event{
// // PelotonTaskEvent: taskEvent,
// // Type: pb_eventstream.Event_PELOTON_TASK_EVENT,
// //}
// //
// //err := rmTask.statusUpdateHandler.AddEvent(event)
// //if err != nil {
// // log.WithError(err).WithField("Event", event).
// // Error("Cannot add status update")
// //}
//}