in pkg/jobmgr/goalstate/job_runtime_updater.go [487:626]
func JobRuntimeUpdater(ctx context.Context, entity goalstate.Entity) error {
id := entity.GetID()
jobID := &peloton.JobID{Value: id}
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.AddJob(jobID)
log.WithField("job_id", id).
Info("running job runtime update")
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("failed to get job runtime in runtime updater")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
config, err := cachedJob.GetConfig(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to get job config")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
err = cachedJob.RepopulateInstanceAvailabilityInfo(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to repopulate SLA info")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
stateCounts, configVersionStateStats,
err := getTaskStateSummaryForJobInCache(ctx, cachedJob, config)
var jobState job.JobState
jobRuntimeUpdate := &job.RuntimeInfo{}
// if job is KILLED: do nothing
// if job is partially created: set job to INITIALIZED and enqueue the job
// else: return error and reschedule the job
if uint32(len(cachedJob.GetAllTasks())) < config.GetInstanceCount() {
if jobRuntime.GetState() == job.JobState_KILLED &&
jobRuntime.GetGoalState() == job.JobState_KILLED {
// Job already killed, do not do anything
return nil
}
}
// determineJobRuntimeStateAndCounts would handle both
// totalInstanceCount > config.GetInstanceCount() and
// partially created job
jobState, transition, err :=
determineJobRuntimeStateAndCounts(
ctx, jobRuntime, stateCounts, config, goalStateDriver, cachedJob)
if err != nil {
return err
}
if jobRuntime.GetTaskStats() != nil &&
jobRuntime.GetTaskStatsByConfigurationVersion() != nil &&
reflect.DeepEqual(stateCounts, jobRuntime.GetTaskStats()) &&
reflect.DeepEqual(configVersionStateStats, jobRuntime.GetTaskStatsByConfigurationVersion()) &&
jobRuntime.GetState() == jobState {
log.WithField("job_id", id).
WithField("task_stats", stateCounts).
WithField("task_stats_by_configurationVersion", configVersionStateStats).
Debug("Task stats did not change, return")
return nil
}
jobRuntimeUpdate = setStartTime(
cachedJob,
jobRuntime,
stateCounts,
jobRuntimeUpdate,
)
jobRuntimeUpdate.State = jobState
jobRuntimeUpdate = setCompletionTime(
cachedJob,
jobState,
jobRuntimeUpdate,
)
jobRuntimeUpdate.TaskStats = stateCounts
jobRuntimeUpdate.ResourceUsage = cachedJob.GetResourceUsage()
jobRuntimeUpdate.TaskStatsByConfigurationVersion = configVersionStateStats
// add to active jobs list BEFORE writing state to job runtime table.
// Also write to active jobs list only when the job is being transitioned
// from a terminal to active state. For active to active transitions, we
// can assume that the job is already in this list from the time it was
// first created. Terminal jobs will be removed from this list and must
// be added back when they are rerun.
if transition == transitionTypeTerminalActive {
if err := goalStateDriver.activeJobsOps.Create(
ctx, jobID); err != nil {
return err
}
}
// Update the job runtime
err = cachedJob.Update(ctx, &job.JobInfo{
Runtime: jobRuntimeUpdate,
}, nil,
nil,
cached.UpdateCacheAndDB)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("failed to update jobRuntime in runtime updater")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
// Evaluate this job immediately when
// 1. job state is terminal and no more task updates will arrive, or
// 2. job is partially created and need to create additional tasks
// (we may have no additional tasks coming in when job is
// partially created)
if util.IsPelotonJobStateTerminal(jobRuntimeUpdate.GetState()) ||
(cachedJob.IsPartiallyCreated(config) &&
!updateutil.HasUpdate(jobRuntime)) {
goalStateDriver.EnqueueJob(jobID, time.Now())
}
log.WithField("job_id", id).
WithField("updated_state", jobState.String()).
Info("job runtime updater completed")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdated.Inc(1)
return nil
}