in pkg/jobmgr/goalstate/job_runtime_updater.go [104:214]
func JobEvaluateMaxRunningInstancesSLA(ctx context.Context, entity goalstate.Entity) error {
id := entity.GetID()
jobID := &peloton.JobID{Value: id}
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.AddJob(jobID)
cachedConfig, err := cachedJob.GetConfig(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to get job config")
return err
}
// Save a read to DB if maxRunningInstances is 0
maxRunningInstances := cachedConfig.GetSLA().GetMaximumRunningInstances()
if maxRunningInstances == 0 {
return nil
}
jobConfig, _, err :=
goalStateDriver.jobConfigOps.GetCurrentVersion(ctx, jobID)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to get job config in start instances")
return err
}
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
Error("Failed to get job runtime during start instances")
goalStateDriver.mtx.jobMetrics.JobRuntimeUpdateFailed.Inc(1)
return err
}
if runtime.GetGoalState() == job.JobState_KILLED {
return nil
}
stateCounts := runtime.GetTaskStats()
currentScheduledInstances := uint32(0)
for _, state := range taskStatesScheduled {
currentScheduledInstances += stateCounts[state.String()]
}
if currentScheduledInstances >= maxRunningInstances {
if currentScheduledInstances > maxRunningInstances {
log.WithFields(log.Fields{
"current_scheduled_tasks": currentScheduledInstances,
"max_running_instances": maxRunningInstances,
"job_id": id,
}).Info("scheduled instances exceed max running instances")
goalStateDriver.mtx.jobMetrics.JobMaxRunningInstancesExceeding.Inc(int64(currentScheduledInstances - maxRunningInstances))
}
log.WithField("current_scheduled_tasks", currentScheduledInstances).
WithField("job_id", id).
Debug("no instances to start")
return nil
}
tasksToStart := maxRunningInstances - currentScheduledInstances
var initializedTasks []uint32
// Calculate the all the initialized tasks for this job from cache
for _, taskInCache := range cachedJob.GetAllTasks() {
if taskInCache.CurrentState().State == task.TaskState_INITIALIZED {
initializedTasks = append(initializedTasks, taskInCache.ID())
}
}
log.WithFields(log.Fields{
"job_id": id,
"max_running_instances": maxRunningInstances,
"current_scheduled_instances": currentScheduledInstances,
"length_initialized_tasks": len(initializedTasks),
"tasks_to_start": tasksToStart,
}).Debug("find tasks to start")
var tasks []*task.TaskInfo
for _, instID := range initializedTasks {
if tasksToStart <= 0 {
break
}
taskRuntime, err := cachedJob.GetTask(instID).GetRuntime(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", id).
WithField("instance_id", instID).
Error("failed to fetch task runtimeme")
continue
}
taskinfo := &task.TaskInfo{
JobId: jobID,
InstanceId: instID,
Runtime: taskRuntime,
Config: taskconfig.Merge(jobConfig.GetDefaultConfig(), jobConfig.GetInstanceConfig()[instID]),
}
if goalStateDriver.IsScheduledTask(jobID, instID) {
continue
}
tasks = append(tasks, taskinfo)
tasksToStart--
}
return sendTasksToResMgr(ctx, jobID, tasks, jobConfig, goalStateDriver)
}