func()

in pkg/jobmgr/task/event/update.go [171:413]


func (p *statusUpdate) ProcessStatusUpdate(
	ctx context.Context,
	updateEvent *statusupdate.Event,
) error {
	var currTaskResourceUsage map[string]float64
	p.logTaskMetrics(updateEvent)

	isOrphanTask, taskInfo, err := p.isOrphanTaskEvent(ctx, updateEvent)
	if err != nil {
		return err
	}

	if isOrphanTask {
		p.metrics.SkipOrphanTasksTotal.Inc(1)
		taskInfo := &pb_task.TaskInfo{
			Runtime: &pb_task.RuntimeInfo{
				State:       updateEvent.State(),
				MesosTaskId: updateEvent.MesosTaskID(),
				AgentID:     updateEvent.AgentID(),
			},
		}

		// Kill the orphan task
		for i := 0; i < _numOrphanTaskKillAttempts; i++ {
			err = jobmgr_task.KillOrphanTask(ctx, p.lm, taskInfo)
			if err == nil {
				return nil
			}
			time.Sleep(_waitForRetryOnErrorOrphanTaskKill)
		}
		return nil
	}

	// whether to skip or not if instance state is similar before and after
	if isDuplicateStateUpdate(taskInfo, updateEvent) {
		return nil
	}

	if updateEvent.State() == pb_task.TaskState_RUNNING &&
		taskInfo.GetConfig().GetVolume() != nil &&
		len(taskInfo.GetRuntime().GetVolumeID().GetValue()) != 0 {
		// Update volume state to be CREATED upon task RUNNING.
		if err := p.updatePersistentVolumeState(ctx, taskInfo); err != nil {
			return err
		}
	}

	newRuntime := proto.Clone(taskInfo.GetRuntime()).(*pb_task.RuntimeInfo)

	// Persist the reason and message for mesos updates
	newRuntime.Message = updateEvent.StatusMsg()
	newRuntime.Reason = ""

	// Persist healthy field if health check is enabled
	if taskInfo.GetConfig().GetHealthCheck() != nil {
		reason := updateEvent.Reason()
		healthy := updateEvent.Healthy()
		p.persistHealthyField(updateEvent.State(), reason, healthy, newRuntime)
	}

	// Update FailureCount
	updateFailureCount(updateEvent.State(), taskInfo.GetRuntime(), newRuntime)

	switch updateEvent.State() {
	case pb_task.TaskState_FAILED:
		reason := updateEvent.Reason()
		msg := updateEvent.Message()
		if reason == mesos.TaskStatus_REASON_TASK_INVALID.String() &&
			strings.Contains(msg, _msgMesosDuplicateID) {
			log.WithField("task_id", updateEvent.TaskID()).
				Info("ignoring duplicate task id failure")
			return nil
		}

		// task failed, do not place the task on the same host for retry,
		// in case it is a machine failure
		newRuntime.DesiredHost = ""
		newRuntime.Reason = reason
		newRuntime.State = updateEvent.State()
		newRuntime.Message = msg
		// TODO p2k: can we build TerminationStatus from PodEvent?
		termStatus := &pb_task.TerminationStatus{
			Reason: pb_task.TerminationStatus_TERMINATION_STATUS_REASON_FAILED,
		}
		if code, err := taskutil.GetExitStatusFromMessage(msg); err == nil {
			termStatus.ExitCode = code
		} else if yarpcerrors.IsNotFound(err) == false {
			log.WithField("task_id", updateEvent.TaskID()).
				WithField("error", err).
				Debug("Failed to extract exit status from message")
		}
		if sig, err := taskutil.GetSignalFromMessage(msg); err == nil {
			termStatus.Signal = sig
		} else if yarpcerrors.IsNotFound(err) == false {
			log.WithField("task_id", updateEvent.TaskID()).
				WithField("error", err).
				Debug("Failed to extract termination signal from message")
		}
		newRuntime.TerminationStatus = termStatus

	case pb_task.TaskState_LOST:
		newRuntime.Reason = updateEvent.Reason()
		if util.IsPelotonStateTerminal(taskInfo.GetRuntime().GetState()) {
			// Skip LOST status update if current state is terminal state.
			log.WithFields(log.Fields{
				"task_id":           updateEvent.TaskID(),
				"db_task_runtime":   taskInfo.GetRuntime(),
				"task_status_event": updateEvent.MesosTaskStatus(),
			}).Debug("skip reschedule lost task as it is already in terminal state")
			return nil
		}
		if taskInfo.GetRuntime().GetGoalState() == pb_task.TaskState_KILLED {
			// Do not take any action for killed tasks, just mark it killed.
			// Same message will go to resource manager which will release the placement.
			log.WithFields(log.Fields{
				"task_id":           updateEvent.TaskID(),
				"db_task_runtime":   taskInfo.GetRuntime(),
				"task_status_event": updateEvent.MesosTaskStatus(),
			}).Debug("mark stopped task as killed due to LOST")
			newRuntime.State = pb_task.TaskState_KILLED
			newRuntime.Message = "Stopped task LOST event: " + updateEvent.StatusMsg()
			break
		}

		if taskInfo.GetConfig().GetVolume() != nil &&
			len(taskInfo.GetRuntime().GetVolumeID().GetValue()) != 0 {
			// Do not reschedule stateful task. Storage layer will decide
			// whether to start or replace this task.
			newRuntime.State = pb_task.TaskState_LOST
			break
		}

		log.WithFields(log.Fields{
			"task_id":           updateEvent.TaskID(),
			"db_task_runtime":   taskInfo.GetRuntime(),
			"task_status_event": updateEvent.MesosTaskStatus(),
		}).Info("reschedule lost task if needed")

		// task failed due to lost, do not place the task on the same host for retry
		newRuntime.DesiredHost = ""
		newRuntime.State = pb_task.TaskState_LOST
		newRuntime.Message = "Task LOST: " + updateEvent.StatusMsg()
		newRuntime.Reason = updateEvent.Reason()

		// Calculate resource usage for TaskState_LOST using time.Now() as
		// completion time
		currTaskResourceUsage = getCurrTaskResourceUsage(
			updateEvent.TaskID(), updateEvent.State(), taskInfo.GetConfig().GetResource(),
			taskInfo.GetRuntime().GetStartTime(),
			now().UTC().Format(time.RFC3339Nano))

	default:
		newRuntime.State = updateEvent.State()
	}

	cachedJob := p.jobFactory.AddJob(taskInfo.GetJobId())
	// Update task start and completion timestamps
	if newRuntime.GetState() == pb_task.TaskState_RUNNING {
		if updateEvent.State() != taskInfo.GetRuntime().GetState() {
			// StartTime is set at the time of first RUNNING event
			// CompletionTime may have been set (e.g. task has been set),
			// which could make StartTime larger than CompletionTime.
			// Reset CompletionTime every time a task transits to RUNNING state.
			newRuntime.StartTime = now().UTC().Format(time.RFC3339Nano)
			newRuntime.CompletionTime = ""
			// when task is RUNNING, reset the desired host field. Therefore,
			// the task would be scheduled onto a different host when the task
			// restarts (e.g due to health check or fail retry)
			newRuntime.DesiredHost = ""

			if len(taskInfo.GetRuntime().GetDesiredHost()) != 0 {
				p.metrics.TasksInPlacePlacementTotal.Inc(1)
				if taskInfo.GetRuntime().GetDesiredHost() == taskInfo.GetRuntime().GetHost() {
					p.metrics.TasksInPlacePlacementSuccess.Inc(1)
				} else {
					log.WithField("job_id", taskInfo.GetJobId().GetValue()).
						WithField("instance_id", taskInfo.GetInstanceId()).
						Info("task fail to place on desired host")
				}
			}
		}

	} else if util.IsPelotonStateTerminal(newRuntime.GetState()) &&
		cachedJob.GetJobType() == pbjob.JobType_BATCH {
		// only update resource count when a batch job is in terminal state
		completionTime := now().UTC().Format(time.RFC3339Nano)
		newRuntime.CompletionTime = completionTime

		currTaskResourceUsage = getCurrTaskResourceUsage(
			updateEvent.TaskID(), updateEvent.State(), taskInfo.GetConfig().GetResource(),
			taskInfo.GetRuntime().GetStartTime(), completionTime)

		if len(currTaskResourceUsage) > 0 {
			// current task resource usage was updated by this event, so we should
			// add it to aggregated resource usage for the task and update runtime
			aggregateTaskResourceUsage := taskInfo.GetRuntime().GetResourceUsage()
			if len(aggregateTaskResourceUsage) > 0 {
				for k, v := range currTaskResourceUsage {
					aggregateTaskResourceUsage[k] += v
				}
				newRuntime.ResourceUsage = aggregateTaskResourceUsage
			}
		}
	} else if cachedJob.GetJobType() == pbjob.JobType_SERVICE {
		// for service job, reset resource usage
		currTaskResourceUsage = nil
		newRuntime.ResourceUsage = nil
	}

	// Update the task update times in job cache and then update the task runtime in cache and DB
	cachedJob.SetTaskUpdateTime(updateEvent.Timestamp())
	if _, err = cachedJob.CompareAndSetTask(
		ctx,
		taskInfo.GetInstanceId(),
		newRuntime,
		false,
	); err != nil {
		log.WithError(err).
			WithFields(log.Fields{
				"task_id": updateEvent.TaskID(),
				"state":   updateEvent.State().String()}).
			Error("Fail to update runtime for taskID")
		return err
	}

	// Enqueue task to goal state
	p.goalStateDriver.EnqueueTask(
		taskInfo.GetJobId(),
		taskInfo.GetInstanceId(),
		time.Now())
	// Enqueue job to goal state as well
	goalstate.EnqueueJobWithDefaultDelay(
		taskInfo.GetJobId(), p.goalStateDriver, cachedJob)

	// Update job's resource usage with the current task resource usage.
	// This is a noop in case currTaskResourceUsage is nil
	// This operation is not idempotent. So we will update job resource usage
	// in cache only after successfully updating task resource usage in DB
	// In case of errors in PatchTasks(), ProcessStatusUpdate will be retried
	// indefinitely until errors are resolved.
	cachedJob.UpdateResourceUsage(currTaskResourceUsage)
	return nil
}