func JobRuntimeUpdater()

in pkg/jobmgr/goalstate/job_runtime_updater.go [487:626]


func JobRuntimeUpdater(ctx context.Context, entity goalstate.Entity) error {
	id := entity.GetID()
	jobID := &peloton.JobID{Value: id}
	goalStateDriver := entity.(*jobEntity).driver
	cachedJob := goalStateDriver.jobFactory.AddJob(jobID)

	log.WithField("job_id", id).
		Info("running job runtime update")

	jobRuntime, err := cachedJob.GetRuntime(ctx)
	if err != nil {
		log.WithError(err).
			WithField("job_id", id).
			Error("failed to get job runtime in runtime updater")
		goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
		return err
	}

	config, err := cachedJob.GetConfig(ctx)
	if err != nil {
		log.WithError(err).
			WithField("job_id", id).
			Error("Failed to get job config")
		goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
		return err
	}

	err = cachedJob.RepopulateInstanceAvailabilityInfo(ctx)
	if err != nil {
		log.WithError(err).
			WithField("job_id", id).
			Error("Failed to repopulate SLA info")
		goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
		return err
	}

	stateCounts, configVersionStateStats,
		err := getTaskStateSummaryForJobInCache(ctx, cachedJob, config)

	var jobState job.JobState
	jobRuntimeUpdate := &job.RuntimeInfo{}
	// if job is KILLED: do nothing
	// if job is partially created: set job to INITIALIZED and enqueue the job
	// else: return error and reschedule the job
	if uint32(len(cachedJob.GetAllTasks())) < config.GetInstanceCount() {
		if jobRuntime.GetState() == job.JobState_KILLED &&
			jobRuntime.GetGoalState() == job.JobState_KILLED {
			// Job already killed, do not do anything
			return nil
		}
	}
	// determineJobRuntimeStateAndCounts would handle both
	// totalInstanceCount > config.GetInstanceCount() and
	// partially created job
	jobState, transition, err :=
		determineJobRuntimeStateAndCounts(
			ctx, jobRuntime, stateCounts, config, goalStateDriver, cachedJob)
	if err != nil {
		return err
	}

	if jobRuntime.GetTaskStats() != nil &&
		jobRuntime.GetTaskStatsByConfigurationVersion() != nil &&
		reflect.DeepEqual(stateCounts, jobRuntime.GetTaskStats()) &&
		reflect.DeepEqual(configVersionStateStats, jobRuntime.GetTaskStatsByConfigurationVersion()) &&
		jobRuntime.GetState() == jobState {
		log.WithField("job_id", id).
			WithField("task_stats", stateCounts).
			WithField("task_stats_by_configurationVersion", configVersionStateStats).
			Debug("Task stats did not change, return")

		return nil
	}

	jobRuntimeUpdate = setStartTime(
		cachedJob,
		jobRuntime,
		stateCounts,
		jobRuntimeUpdate,
	)

	jobRuntimeUpdate.State = jobState

	jobRuntimeUpdate = setCompletionTime(
		cachedJob,
		jobState,
		jobRuntimeUpdate,
	)

	jobRuntimeUpdate.TaskStats = stateCounts

	jobRuntimeUpdate.ResourceUsage = cachedJob.GetResourceUsage()

	jobRuntimeUpdate.TaskStatsByConfigurationVersion = configVersionStateStats

	// add to active jobs list BEFORE writing state to job runtime table.
	// Also write to active jobs list only when the job is being transitioned
	// from a terminal to active state. For active to active transitions, we
	// can assume that the job is already in this list from the time it was
	// first created. Terminal jobs will be removed from this list and must
	// be added back when they are rerun.
	if transition == transitionTypeTerminalActive {
		if err := goalStateDriver.activeJobsOps.Create(
			ctx, jobID); err != nil {
			return err
		}
	}

	// Update the job runtime
	err = cachedJob.Update(ctx, &job.JobInfo{
		Runtime: jobRuntimeUpdate,
	}, nil,
		nil,
		cached.UpdateCacheAndDB)
	if err != nil {
		log.WithError(err).
			WithField("job_id", id).
			Error("failed to update jobRuntime in runtime updater")
		goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
		return err
	}

	// Evaluate this job immediately when
	// 1. job state is terminal and no more task updates will arrive, or
	// 2. job is partially created and need to create additional tasks
	// (we may have no additional tasks coming in when job is
	// partially created)
	if util.IsPelotonJobStateTerminal(jobRuntimeUpdate.GetState()) ||
		(cachedJob.IsPartiallyCreated(config) &&
			!updateutil.HasUpdate(jobRuntime)) {
		goalStateDriver.EnqueueJob(jobID, time.Now())
	}

	log.WithField("job_id", id).
		WithField("updated_state", jobState.String()).
		Info("job runtime updater completed")

	goalStateDriver.mtx.jobMetrics.JobRuntimeUpdated.Inc(1)
	return nil
}