in pkg/jobmgr/task/event/update.go [171:413]
func (p *statusUpdate) ProcessStatusUpdate(
ctx context.Context,
updateEvent *statusupdate.Event,
) error {
var currTaskResourceUsage map[string]float64
p.logTaskMetrics(updateEvent)
isOrphanTask, taskInfo, err := p.isOrphanTaskEvent(ctx, updateEvent)
if err != nil {
return err
}
if isOrphanTask {
p.metrics.SkipOrphanTasksTotal.Inc(1)
taskInfo := &pb_task.TaskInfo{
Runtime: &pb_task.RuntimeInfo{
State: updateEvent.State(),
MesosTaskId: updateEvent.MesosTaskID(),
AgentID: updateEvent.AgentID(),
},
}
// Kill the orphan task
for i := 0; i < _numOrphanTaskKillAttempts; i++ {
err = jobmgr_task.KillOrphanTask(ctx, p.lm, taskInfo)
if err == nil {
return nil
}
time.Sleep(_waitForRetryOnErrorOrphanTaskKill)
}
return nil
}
// whether to skip or not if instance state is similar before and after
if isDuplicateStateUpdate(taskInfo, updateEvent) {
return nil
}
if updateEvent.State() == pb_task.TaskState_RUNNING &&
taskInfo.GetConfig().GetVolume() != nil &&
len(taskInfo.GetRuntime().GetVolumeID().GetValue()) != 0 {
// Update volume state to be CREATED upon task RUNNING.
if err := p.updatePersistentVolumeState(ctx, taskInfo); err != nil {
return err
}
}
newRuntime := proto.Clone(taskInfo.GetRuntime()).(*pb_task.RuntimeInfo)
// Persist the reason and message for mesos updates
newRuntime.Message = updateEvent.StatusMsg()
newRuntime.Reason = ""
// Persist healthy field if health check is enabled
if taskInfo.GetConfig().GetHealthCheck() != nil {
reason := updateEvent.Reason()
healthy := updateEvent.Healthy()
p.persistHealthyField(updateEvent.State(), reason, healthy, newRuntime)
}
// Update FailureCount
updateFailureCount(updateEvent.State(), taskInfo.GetRuntime(), newRuntime)
switch updateEvent.State() {
case pb_task.TaskState_FAILED:
reason := updateEvent.Reason()
msg := updateEvent.Message()
if reason == mesos.TaskStatus_REASON_TASK_INVALID.String() &&
strings.Contains(msg, _msgMesosDuplicateID) {
log.WithField("task_id", updateEvent.TaskID()).
Info("ignoring duplicate task id failure")
return nil
}
// task failed, do not place the task on the same host for retry,
// in case it is a machine failure
newRuntime.DesiredHost = ""
newRuntime.Reason = reason
newRuntime.State = updateEvent.State()
newRuntime.Message = msg
// TODO p2k: can we build TerminationStatus from PodEvent?
termStatus := &pb_task.TerminationStatus{
Reason: pb_task.TerminationStatus_TERMINATION_STATUS_REASON_FAILED,
}
if code, err := taskutil.GetExitStatusFromMessage(msg); err == nil {
termStatus.ExitCode = code
} else if yarpcerrors.IsNotFound(err) == false {
log.WithField("task_id", updateEvent.TaskID()).
WithField("error", err).
Debug("Failed to extract exit status from message")
}
if sig, err := taskutil.GetSignalFromMessage(msg); err == nil {
termStatus.Signal = sig
} else if yarpcerrors.IsNotFound(err) == false {
log.WithField("task_id", updateEvent.TaskID()).
WithField("error", err).
Debug("Failed to extract termination signal from message")
}
newRuntime.TerminationStatus = termStatus
case pb_task.TaskState_LOST:
newRuntime.Reason = updateEvent.Reason()
if util.IsPelotonStateTerminal(taskInfo.GetRuntime().GetState()) {
// Skip LOST status update if current state is terminal state.
log.WithFields(log.Fields{
"task_id": updateEvent.TaskID(),
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": updateEvent.MesosTaskStatus(),
}).Debug("skip reschedule lost task as it is already in terminal state")
return nil
}
if taskInfo.GetRuntime().GetGoalState() == pb_task.TaskState_KILLED {
// Do not take any action for killed tasks, just mark it killed.
// Same message will go to resource manager which will release the placement.
log.WithFields(log.Fields{
"task_id": updateEvent.TaskID(),
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": updateEvent.MesosTaskStatus(),
}).Debug("mark stopped task as killed due to LOST")
newRuntime.State = pb_task.TaskState_KILLED
newRuntime.Message = "Stopped task LOST event: " + updateEvent.StatusMsg()
break
}
if taskInfo.GetConfig().GetVolume() != nil &&
len(taskInfo.GetRuntime().GetVolumeID().GetValue()) != 0 {
// Do not reschedule stateful task. Storage layer will decide
// whether to start or replace this task.
newRuntime.State = pb_task.TaskState_LOST
break
}
log.WithFields(log.Fields{
"task_id": updateEvent.TaskID(),
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": updateEvent.MesosTaskStatus(),
}).Info("reschedule lost task if needed")
// task failed due to lost, do not place the task on the same host for retry
newRuntime.DesiredHost = ""
newRuntime.State = pb_task.TaskState_LOST
newRuntime.Message = "Task LOST: " + updateEvent.StatusMsg()
newRuntime.Reason = updateEvent.Reason()
// Calculate resource usage for TaskState_LOST using time.Now() as
// completion time
currTaskResourceUsage = getCurrTaskResourceUsage(
updateEvent.TaskID(), updateEvent.State(), taskInfo.GetConfig().GetResource(),
taskInfo.GetRuntime().GetStartTime(),
now().UTC().Format(time.RFC3339Nano))
default:
newRuntime.State = updateEvent.State()
}
cachedJob := p.jobFactory.AddJob(taskInfo.GetJobId())
// Update task start and completion timestamps
if newRuntime.GetState() == pb_task.TaskState_RUNNING {
if updateEvent.State() != taskInfo.GetRuntime().GetState() {
// StartTime is set at the time of first RUNNING event
// CompletionTime may have been set (e.g. task has been set),
// which could make StartTime larger than CompletionTime.
// Reset CompletionTime every time a task transits to RUNNING state.
newRuntime.StartTime = now().UTC().Format(time.RFC3339Nano)
newRuntime.CompletionTime = ""
// when task is RUNNING, reset the desired host field. Therefore,
// the task would be scheduled onto a different host when the task
// restarts (e.g due to health check or fail retry)
newRuntime.DesiredHost = ""
if len(taskInfo.GetRuntime().GetDesiredHost()) != 0 {
p.metrics.TasksInPlacePlacementTotal.Inc(1)
if taskInfo.GetRuntime().GetDesiredHost() == taskInfo.GetRuntime().GetHost() {
p.metrics.TasksInPlacePlacementSuccess.Inc(1)
} else {
log.WithField("job_id", taskInfo.GetJobId().GetValue()).
WithField("instance_id", taskInfo.GetInstanceId()).
Info("task fail to place on desired host")
}
}
}
} else if util.IsPelotonStateTerminal(newRuntime.GetState()) &&
cachedJob.GetJobType() == pbjob.JobType_BATCH {
// only update resource count when a batch job is in terminal state
completionTime := now().UTC().Format(time.RFC3339Nano)
newRuntime.CompletionTime = completionTime
currTaskResourceUsage = getCurrTaskResourceUsage(
updateEvent.TaskID(), updateEvent.State(), taskInfo.GetConfig().GetResource(),
taskInfo.GetRuntime().GetStartTime(), completionTime)
if len(currTaskResourceUsage) > 0 {
// current task resource usage was updated by this event, so we should
// add it to aggregated resource usage for the task and update runtime
aggregateTaskResourceUsage := taskInfo.GetRuntime().GetResourceUsage()
if len(aggregateTaskResourceUsage) > 0 {
for k, v := range currTaskResourceUsage {
aggregateTaskResourceUsage[k] += v
}
newRuntime.ResourceUsage = aggregateTaskResourceUsage
}
}
} else if cachedJob.GetJobType() == pbjob.JobType_SERVICE {
// for service job, reset resource usage
currTaskResourceUsage = nil
newRuntime.ResourceUsage = nil
}
// Update the task update times in job cache and then update the task runtime in cache and DB
cachedJob.SetTaskUpdateTime(updateEvent.Timestamp())
if _, err = cachedJob.CompareAndSetTask(
ctx,
taskInfo.GetInstanceId(),
newRuntime,
false,
); err != nil {
log.WithError(err).
WithFields(log.Fields{
"task_id": updateEvent.TaskID(),
"state": updateEvent.State().String()}).
Error("Fail to update runtime for taskID")
return err
}
// Enqueue task to goal state
p.goalStateDriver.EnqueueTask(
taskInfo.GetJobId(),
taskInfo.GetInstanceId(),
time.Now())
// Enqueue job to goal state as well
goalstate.EnqueueJobWithDefaultDelay(
taskInfo.GetJobId(), p.goalStateDriver, cachedJob)
// Update job's resource usage with the current task resource usage.
// This is a noop in case currTaskResourceUsage is nil
// This operation is not idempotent. So we will update job resource usage
// in cache only after successfully updating task resource usage in DB
// In case of errors in PatchTasks(), ProcessStatusUpdate will be retried
// indefinitely until errors are resolved.
cachedJob.UpdateResourceUsage(currTaskResourceUsage)
return nil
}