pkg/jobmgr/goalstate/job_runtime_updater.go (524 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package goalstate
import (
"context"
"reflect"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/pkg/common/goalstate"
"github.com/uber/peloton/pkg/common/taskconfig"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/jobmgr/cached"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
updateutil "github.com/uber/peloton/pkg/jobmgr/util/update"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc/yarpcerrors"
)
// Runtime states of a Job
type transitionType int32
const (
// When job state is unknown, transition type is also unknown
transitionTypeUnknown transitionType = 0
// When job transitions from one active state to another
transitionTypeActiveActive transitionType = 1
// When job transitions from active state to terminal state
transitionTypeActiveTerminal transitionType = 2
// When job transitions from terminal state to active state
transitionTypeTerminalActive transitionType = 3
)
// taskStatesAfterStart is the set of Peloton task states which
// indicate a task is being or has already been started.
var taskStatesAfterStart = []task.TaskState{
task.TaskState_STARTING,
task.TaskState_RUNNING,
task.TaskState_SUCCEEDED,
task.TaskState_FAILED,
task.TaskState_LOST,
task.TaskState_PREEMPTING,
task.TaskState_KILLING,
task.TaskState_KILLED,
}
// taskStatesScheduled is the set of Peloton task states which
// indicate a task has been sent to resource manager, or has been
// placed by the resource manager, and has not reached a terminal state.
// It will be used to determine which tasks in DB (or cache) have not yet
// been sent to resource manager for getting placed.
var taskStatesScheduled = []task.TaskState{
task.TaskState_RUNNING,
task.TaskState_PENDING,
task.TaskState_LAUNCHED,
task.TaskState_READY,
task.TaskState_PLACING,
task.TaskState_PLACED,
task.TaskState_LAUNCHING,
task.TaskState_STARTING,
task.TaskState_PREEMPTING,
task.TaskState_KILLING,
}
var allTaskStates = []task.TaskState{
task.TaskState_UNKNOWN,
task.TaskState_INITIALIZED,
task.TaskState_PENDING,
task.TaskState_READY,
task.TaskState_PLACING,
task.TaskState_PLACED,
task.TaskState_LAUNCHING,
task.TaskState_LAUNCHED,
task.TaskState_STARTING,
task.TaskState_RUNNING,
task.TaskState_SUCCEEDED,
task.TaskState_FAILED,
task.TaskState_LOST,
task.TaskState_PREEMPTING,
task.TaskState_KILLING,
task.TaskState_KILLED,
task.TaskState_DELETED,
}
// JobEvaluateMaxRunningInstancesSLA evaluates the maximum running instances job SLA
// and determines instances to start if any.
func JobEvaluateMaxRunningInstancesSLA(ctx context.Context, entity goalstate.Entity) error {
id := entity.GetID()
jobID := &peloton.JobID{Value: id}
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.AddJob(jobID)
cachedConfig, err := cachedJob.GetConfig(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to get job config")
return err
}
// Save a read to DB if maxRunningInstances is 0
maxRunningInstances := cachedConfig.GetSLA().GetMaximumRunningInstances()
if maxRunningInstances == 0 {
return nil
}
jobConfig, _, err :=
goalStateDriver.jobConfigOps.GetCurrentVersion(ctx, jobID)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to get job config in start instances")
return err
}
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to get job runtime during start instances")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
if runtime.GetGoalState() == job.JobState_KILLED {
return nil
}
stateCounts := runtime.GetTaskStats()
currentScheduledInstances := uint32(0)
for _, state := range taskStatesScheduled {
currentScheduledInstances += stateCounts[state.String()]
}
if currentScheduledInstances >= maxRunningInstances {
if currentScheduledInstances > maxRunningInstances {
log.WithFields(log.Fields{
"current_scheduled_tasks": currentScheduledInstances,
"max_running_instances": maxRunningInstances,
"job_id": id,
}).Info("scheduled instances exceed max running instances")
goalStateDriver.mtx.jobMetrics.JobMaxRunningInstancesExceeding.Inc(int64(currentScheduledInstances - maxRunningInstances))
}
log.WithField("current_scheduled_tasks", currentScheduledInstances).
WithField("job_id", id).
Debug("no instances to start")
return nil
}
tasksToStart := maxRunningInstances - currentScheduledInstances
var initializedTasks []uint32
// Calculate the all the initialized tasks for this job from cache
for _, taskInCache := range cachedJob.GetAllTasks() {
if taskInCache.CurrentState().State == task.TaskState_INITIALIZED {
initializedTasks = append(initializedTasks, taskInCache.ID())
}
}
log.WithFields(log.Fields{
"job_id": id,
"max_running_instances": maxRunningInstances,
"current_scheduled_instances": currentScheduledInstances,
"length_initialized_tasks": len(initializedTasks),
"tasks_to_start": tasksToStart,
}).Debug("find tasks to start")
var tasks []*task.TaskInfo
for _, instID := range initializedTasks {
if tasksToStart <= 0 {
break
}
taskRuntime, err := cachedJob.GetTask(instID).GetRuntime(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
WithField("instance_id", instID).
Error("failed to fetch task runtimeme")
continue
}
taskinfo := &task.TaskInfo{
JobId: jobID,
InstanceId: instID,
Runtime: taskRuntime,
Config: taskconfig.Merge(jobConfig.GetDefaultConfig(), jobConfig.GetInstanceConfig()[instID]),
}
if goalStateDriver.IsScheduledTask(jobID, instID) {
continue
}
tasks = append(tasks, taskinfo)
tasksToStart--
}
return sendTasksToResMgr(ctx, jobID, tasks, jobConfig, goalStateDriver)
}
// stateDeterminer determines job state given the current job runtime
type stateDeterminer interface {
getState(ctx context.Context, jobRuntime *job.RuntimeInfo) (job.JobState, error)
}
func jobStateDeterminerFactory(
jobRuntime *job.RuntimeInfo,
stateCounts map[string]uint32,
cachedJob cached.Job,
config jobmgrcommon.JobConfig) stateDeterminer {
totalInstanceCount := getTotalInstanceCount(stateCounts)
// a batch/service job is partially created if
// number of total instance count is smaller than configured
if totalInstanceCount < config.GetInstanceCount() &&
cachedJob.IsPartiallyCreated(config) {
return newPartiallyCreatedJobStateDeterminer(cachedJob, stateCounts)
}
if cached.HasControllerTask(config) {
return newControllerTaskJobStateDeterminer(cachedJob, stateCounts, config)
}
return newJobStateDeterminer(stateCounts, config)
}
func newJobStateDeterminer(
stateCounts map[string]uint32,
config jobmgrcommon.JobConfig,
) *jobStateDeterminer {
return &jobStateDeterminer{
stateCounts: stateCounts,
config: config,
}
}
type jobStateDeterminer struct {
stateCounts map[string]uint32
config jobmgrcommon.JobConfig
}
func (d *jobStateDeterminer) getState(
ctx context.Context,
jobRuntime *job.RuntimeInfo,
) (job.JobState, error) {
totalInstanceCount := d.config.GetInstanceCount()
// There is one reason where state counts can be greater than
// configured instance count,
// which is Workflow to reduce instance count and change spec failed/aborted
// If Job's goal state is non-terminal then return service job's default
// state PENDING
// If terminal then continue to evaluate state counts for job runtime state
if getTotalInstanceCount(d.stateCounts) > totalInstanceCount {
if d.config.GetType() == job.JobType_BATCH {
return job.JobState_PENDING, nil
}
if d.config.GetType() == job.JobType_SERVICE &&
!util.IsPelotonJobStateTerminal(jobRuntime.GetGoalState()) {
return job.JobState_PENDING, nil
}
}
// all succeeded -> succeeded
if d.stateCounts[task.TaskState_SUCCEEDED.String()] >= totalInstanceCount {
return job.JobState_SUCCEEDED, nil
}
// some succeeded, some failed, some lost -> failed
if d.stateCounts[task.TaskState_SUCCEEDED.String()]+
d.stateCounts[task.TaskState_FAILED.String()]+
d.stateCounts[task.TaskState_LOST.String()] >= totalInstanceCount {
return job.JobState_FAILED, nil
}
// some killed, some succeeded, some failed, some lost -> killed
if d.stateCounts[task.TaskState_KILLED.String()] > 0 &&
(d.stateCounts[task.TaskState_SUCCEEDED.String()]+
d.stateCounts[task.TaskState_FAILED.String()]+
d.stateCounts[task.TaskState_KILLED.String()]+
d.stateCounts[task.TaskState_LOST.String()] >= totalInstanceCount) {
return job.JobState_KILLED, nil
}
if jobRuntime.State == job.JobState_KILLING {
// jobState is set to KILLING in JobKill to avoid materialized view delay,
// should keep the state to be KILLING unless job transits to terminal state
return job.JobState_KILLING, nil
}
if d.stateCounts[task.TaskState_RUNNING.String()] > 0 {
return job.JobState_RUNNING, nil
}
return job.JobState_PENDING, nil
}
func newPartiallyCreatedJobStateDeterminer(
cachedJob cached.Job,
stateCounts map[string]uint32,
) *partiallyCreatedJobStateDeterminer {
return &partiallyCreatedJobStateDeterminer{
cachedJob: cachedJob,
stateCounts: stateCounts,
}
}
type partiallyCreatedJobStateDeterminer struct {
cachedJob cached.Job
stateCounts map[string]uint32
}
func (d *partiallyCreatedJobStateDeterminer) getState(
ctx context.Context,
jobRuntime *job.RuntimeInfo,
) (job.JobState, error) {
// partially created instance count
instanceCount := getTotalInstanceCount(d.stateCounts)
switch d.cachedJob.GetJobType() {
case job.JobType_BATCH:
return job.JobState_INITIALIZED, nil
case job.JobType_SERVICE:
// job goal state is terminal &&
// some killed + some succeeded + some failed + some lost -> killed
if util.IsPelotonJobStateTerminal(jobRuntime.GetGoalState()) &&
d.stateCounts[task.TaskState_KILLED.String()] > 0 &&
(d.stateCounts[task.TaskState_KILLED.String()]+
d.stateCounts[task.TaskState_SUCCEEDED.String()]+
d.stateCounts[task.TaskState_FAILED.String()]+
d.stateCounts[task.TaskState_LOST.String()] == instanceCount) {
return job.JobState_KILLED, nil
}
// job goal state is terminal && no instance created -> killed
if util.IsPelotonJobStateTerminal(jobRuntime.GetGoalState()) &&
instanceCount == 0 {
return job.JobState_KILLED, nil
}
// job goal state is terminal &&
// some failed + some succeeded + some lost -> failed
if util.IsPelotonJobStateTerminal(jobRuntime.GetGoalState()) &&
(d.stateCounts[task.TaskState_FAILED.String()]+
d.stateCounts[task.TaskState_SUCCEEDED.String()]+
d.stateCounts[task.TaskState_LOST.String()] == instanceCount) {
return job.JobState_FAILED, nil
}
return job.JobState_PENDING, nil
}
return job.JobState_UNKNOWN, yarpcerrors.InternalErrorf("unknown job type")
}
func newControllerTaskJobStateDeterminer(
cachedJob cached.Job,
stateCounts map[string]uint32,
config jobmgrcommon.JobConfig,
) *controllerTaskJobStateDeterminer {
return &controllerTaskJobStateDeterminer{
cachedJob: cachedJob,
batchDeterminer: newJobStateDeterminer(stateCounts, config),
}
}
type controllerTaskJobStateDeterminer struct {
cachedJob cached.Job
batchDeterminer *jobStateDeterminer
}
// If the job will be in terminal state, state of task would be determined by
// controller task. Otherwise it would be de
func (d *controllerTaskJobStateDeterminer) getState(
ctx context.Context,
jobRuntime *job.RuntimeInfo,
) (job.JobState, error) {
jobState, err := d.batchDeterminer.getState(ctx, jobRuntime)
if err != nil {
return job.JobState_UNKNOWN, err
}
if !util.IsPelotonJobStateTerminal(jobState) {
return jobState, nil
}
// In job config validation, it makes sure controller
// task would be the first task
controllerTask, err := d.cachedJob.AddTask(ctx, 0)
if err != nil {
return job.JobState_UNKNOWN, err
}
controllerTaskRuntime, err := controllerTask.GetRuntime(ctx)
if err != nil {
return job.JobState_UNKNOWN, err
}
switch controllerTaskRuntime.GetState() {
case task.TaskState_SUCCEEDED:
return job.JobState_SUCCEEDED, nil
case task.TaskState_LOST:
fallthrough
case task.TaskState_FAILED:
return job.JobState_FAILED, nil
default:
// only terminal state would enter switch statement,
// so the state left must be KILLED
return job.JobState_KILLED, nil
}
}
// getTransitionType returns the type of state transition for this job.
// for example: a job being restarted would move from a terminal to active
// state and the state transition returned is transitionTypeTerminalActive
func getTransitionType(newState, oldState job.JobState) transitionType {
newStateIsTerminal := util.IsPelotonJobStateTerminal(newState)
oldStateIsTerminal := util.IsPelotonJobStateTerminal(oldState)
if newStateIsTerminal && !oldStateIsTerminal {
return transitionTypeActiveTerminal
} else if !newStateIsTerminal && oldStateIsTerminal {
return transitionTypeTerminalActive
}
return transitionTypeActiveActive
}
// determineJobRuntimeStateAndCounts determines the job state based on current
// job runtime state and task state counts.
// For stateless jobs, it computes the config version stats and the
// task state counts from the cache.
// This function is not expected to be called when
// totalInstanceCount < config.GetInstanceCount.
// UNKNOWN state would be returned if no enough info is presented in
// cache. Caller should retry later after cache is filled in.
func determineJobRuntimeStateAndCounts(
ctx context.Context,
jobRuntime *job.RuntimeInfo,
stateCounts map[string]uint32,
config jobmgrcommon.JobConfig,
goalStateDriver *driver,
cachedJob cached.Job,
) (job.JobState, transitionType,
error) {
prevState := jobRuntime.GetState()
jobStateDeterminer := jobStateDeterminerFactory(
jobRuntime, stateCounts, cachedJob, config)
jobState, err := jobStateDeterminer.getState(ctx, jobRuntime)
if err != nil {
return job.JobState_UNKNOWN, transitionTypeUnknown, err
}
switch jobState {
case job.JobState_SUCCEEDED:
goalStateDriver.mtx.jobMetrics.JobSucceeded.Inc(1)
case job.JobState_FAILED:
goalStateDriver.mtx.jobMetrics.JobFailed.Inc(1)
case job.JobState_KILLED:
goalStateDriver.mtx.jobMetrics.JobKilled.Inc(1)
case job.JobState_DELETED:
goalStateDriver.mtx.jobMetrics.JobDeleted.Inc(1)
}
return jobState, getTransitionType(jobState,
prevState), nil
}
// JobRuntimeUpdater updates the job runtime.
// When the jobmgr leader fails over, the goal state driver runs syncFromDB which enqueues all recovered jobs
// into goal state, which will then run the job runtime updater and update the out-of-date runtime info.
func JobRuntimeUpdater(ctx context.Context, entity goalstate.Entity) error {
id := entity.GetID()
jobID := &peloton.JobID{Value: id}
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.AddJob(jobID)
log.WithField("job_id", id).
Info("running job runtime update")
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("failed to get job runtime in runtime updater")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
config, err := cachedJob.GetConfig(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to get job config")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
err = cachedJob.RepopulateInstanceAvailabilityInfo(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to repopulate SLA info")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
stateCounts, configVersionStateStats,
err := getTaskStateSummaryForJobInCache(ctx, cachedJob, config)
var jobState job.JobState
jobRuntimeUpdate := &job.RuntimeInfo{}
// if job is KILLED: do nothing
// if job is partially created: set job to INITIALIZED and enqueue the job
// else: return error and reschedule the job
if uint32(len(cachedJob.GetAllTasks())) < config.GetInstanceCount() {
if jobRuntime.GetState() == job.JobState_KILLED &&
jobRuntime.GetGoalState() == job.JobState_KILLED {
// Job already killed, do not do anything
return nil
}
}
// determineJobRuntimeStateAndCounts would handle both
// totalInstanceCount > config.GetInstanceCount() and
// partially created job
jobState, transition, err :=
determineJobRuntimeStateAndCounts(
ctx, jobRuntime, stateCounts, config, goalStateDriver, cachedJob)
if err != nil {
return err
}
if jobRuntime.GetTaskStats() != nil &&
jobRuntime.GetTaskStatsByConfigurationVersion() != nil &&
reflect.DeepEqual(stateCounts, jobRuntime.GetTaskStats()) &&
reflect.DeepEqual(configVersionStateStats, jobRuntime.GetTaskStatsByConfigurationVersion()) &&
jobRuntime.GetState() == jobState {
log.WithField("job_id", id).
WithField("task_stats", stateCounts).
WithField("task_stats_by_configurationVersion", configVersionStateStats).
Debug("Task stats did not change, return")
return nil
}
jobRuntimeUpdate = setStartTime(
cachedJob,
jobRuntime,
stateCounts,
jobRuntimeUpdate,
)
jobRuntimeUpdate.State = jobState
jobRuntimeUpdate = setCompletionTime(
cachedJob,
jobState,
jobRuntimeUpdate,
)
jobRuntimeUpdate.TaskStats = stateCounts
jobRuntimeUpdate.ResourceUsage = cachedJob.GetResourceUsage()
jobRuntimeUpdate.TaskStatsByConfigurationVersion = configVersionStateStats
// add to active jobs list BEFORE writing state to job runtime table.
// Also write to active jobs list only when the job is being transitioned
// from a terminal to active state. For active to active transitions, we
// can assume that the job is already in this list from the time it was
// first created. Terminal jobs will be removed from this list and must
// be added back when they are rerun.
if transition == transitionTypeTerminalActive {
if err := goalStateDriver.activeJobsOps.Create(
ctx, jobID); err != nil {
return err
}
}
// Update the job runtime
err = cachedJob.Update(ctx, &job.JobInfo{
Runtime: jobRuntimeUpdate,
}, nil,
nil,
cached.UpdateCacheAndDB)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("failed to update jobRuntime in runtime updater")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
// Evaluate this job immediately when
// 1. job state is terminal and no more task updates will arrive, or
// 2. job is partially created and need to create additional tasks
// (we may have no additional tasks coming in when job is
// partially created)
if util.IsPelotonJobStateTerminal(jobRuntimeUpdate.GetState()) ||
(cachedJob.IsPartiallyCreated(config) &&
!updateutil.HasUpdate(jobRuntime)) {
goalStateDriver.EnqueueJob(jobID, time.Now())
}
log.WithField("job_id", id).
WithField("updated_state", jobState.String()).
Info("job runtime updater completed")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdated.Inc(1)
return nil
}
func getTotalInstanceCount(stateCounts map[string]uint32) uint32 {
totalInstanceCount := uint32(0)
for _, state := range task.TaskState_name {
totalInstanceCount += stateCounts[state]
}
return totalInstanceCount
}
// setStartTime adds start time to jobRuntimeUpdate, if the job
// first starts. It returns the updated jobRuntimeUpdate.
func setStartTime(
cachedJob cached.Job,
jobRuntime *job.RuntimeInfo,
stateCounts map[string]uint32,
jobRuntimeUpdate *job.RuntimeInfo) *job.RuntimeInfo {
getFirstTaskUpdateTime := cachedJob.GetFirstTaskUpdateTime()
if getFirstTaskUpdateTime != 0 && jobRuntime.StartTime == "" {
count := uint32(0)
for _, state := range taskStatesAfterStart {
count += stateCounts[state.String()]
}
if count > 0 {
jobRuntimeUpdate.StartTime = util.FormatTime(getFirstTaskUpdateTime, time.RFC3339Nano)
}
}
return jobRuntimeUpdate
}
// setCompletionTime adds completion time to jobRuntimeUpdate, if the job
// completes. It returns the updated jobRuntimeUpdate.
func setCompletionTime(
cachedJob cached.Job,
jobState job.JobState,
jobRuntimeUpdate *job.RuntimeInfo) *job.RuntimeInfo {
if util.IsPelotonJobStateTerminal(jobState) {
// In case a job moved from PENDING/INITIALIZED to KILLED state,
// the lastTaskUpdateTime will be 0. In this case, we will use
// time.Now() as default completion time since a job in terminal
// state should always have a completion time
completionTime := time.Now().UTC().Format(time.RFC3339Nano)
lastTaskUpdateTime := cachedJob.GetLastTaskUpdateTime()
if lastTaskUpdateTime != 0 {
completionTime = util.FormatTime(lastTaskUpdateTime, time.RFC3339Nano)
}
jobRuntimeUpdate.CompletionTime = completionTime
} else {
// in case job moves from terminal state to non-terminal state
jobRuntimeUpdate.CompletionTime = ""
}
return jobRuntimeUpdate
}
// getTaskStateSummaryForJobInCache loop through tasks in cache one by one
// to calculate the task states summary
// and update the configuration version state map for stateless jobs
func getTaskStateSummaryForJobInCache(ctx context.Context,
cachedJob cached.Job,
config jobmgrcommon.JobConfig,
) (map[string]uint32, map[uint64]*job.RuntimeInfo_TaskStateStats, error) {
stateCounts := make(map[string]uint32)
configVersionStateStats := make(map[uint64]*job.
RuntimeInfo_TaskStateStats)
for _, taskStatus := range task.TaskState_name {
stateCounts[taskStatus] = 0
}
for _, taskinCache := range cachedJob.GetAllTasks() {
stateCounts[taskinCache.CurrentState().State.String()]++
// update the configuration version state map for stateless jobs
if config.GetType() == job.JobType_SERVICE {
runtime, err := taskinCache.GetRuntime(ctx)
if err != nil {
return nil, nil, err
}
if _, ok := configVersionStateStats[runtime.GetConfigVersion()]; !ok {
configVersionStateStats[runtime.GetConfigVersion()] = &job.RuntimeInfo_TaskStateStats{
StateStats: make(map[string]uint32),
}
}
configVersionStateStats[runtime.GetConfigVersion()].StateStats[runtime.GetState().String()]++
}
}
return stateCounts, configVersionStateStats, nil
}