func TaskStart()

in pkg/jobmgr/goalstate/task_start.go [44:139]


func TaskStart(ctx context.Context, entity goalstate.Entity) error {
	taskEnt := entity.(*taskEntity)
	goalStateDriver := taskEnt.driver
	cachedJob := goalStateDriver.jobFactory.GetJob(taskEnt.jobID)
	if cachedJob == nil {
		return nil
	}

	cachedConfig, err := cachedJob.GetConfig(ctx)
	if err != nil {
		log.WithError(err).
			WithField("job_id", taskEnt.jobID).
			WithField("instance_id", taskEnt.instanceID).
			Error("Failed to get job config for task")
		return err
	}

	if cachedConfig.GetSLA().GetMaximumRunningInstances() > 0 {
		// Tasks are enqueued into goal state in INITIALiZED state either
		// during recovery or due to task restart due to failure/task lost
		// or due to launch/starting state timeouts. In all these cases,
		// job is enqueued into goal state as well. So, this is merely a safety
		// check, hence enqueue with a large delay to prevent too many
		// enqueues of the same job during recovery.
		goalStateDriver.EnqueueJob(taskEnt.jobID, time.Now().Add(
			_jobEnqueueMultiplierOnTaskStart*
				goalStateDriver.JobRuntimeDuration(cachedConfig.GetType())))
		return nil
	}

	taskID := taskEnt.GetID()
	taskInfo, err := goalStateDriver.taskStore.GetTaskByID(ctx, taskID)
	if err != nil {
		log.WithError(err).
			WithField("job_id", taskEnt.jobID).
			WithField("instance_id", taskEnt.instanceID).
			Error("failed to fetch task info in task start")
		return err
	}
	if taskInfo == nil {
		return fmt.Errorf("task info not found for %v", taskID)
	}

	// TODO: Investigate how to create proper gangs for scheduling (currently, task are treat independently)
	response, err := jobmgr_task.EnqueueGangs(
		ctx,
		[]*task.TaskInfo{taskInfo},
		cachedConfig,
		goalStateDriver.resmgrClient)

	// Parse the EnqueueGangs response to determine if the task is successfully enqueued
	// or has been previously enqueued, and should transition to PENDING state.
	enqueued := func(res *resmgrsvc.EnqueueGangsResponse, e error) bool {
		if e == nil && res.GetError() == nil {
			return true
		}

		if res.GetError().GetFailure().GetFailed() != nil {
			failed := response.GetError().GetFailure().GetFailed()
			if len(failed) == 1 && failed[0].Errorcode ==
				resmgrsvc.EnqueueGangsFailure_ENQUEUE_GANGS_FAILURE_ERROR_CODE_ALREADY_EXIST {
				jid, instID, err := util.ParseTaskID(failed[0].Task.GetId().GetValue())
				if err == nil || jid == taskEnt.jobID.GetValue() || instID == taskEnt.instanceID {
					return true
				}
			}
		}
		return false
	}(response, err)

	if !enqueued {
		return yarpcerrors.InternalErrorf("failed to enqueue task into resource manager %v", taskID)
	}

	// Update task state to PENDING
	runtime := taskInfo.GetRuntime()
	if runtime.GetState() != task.TaskState_PENDING {
		var instancesToRetry []uint32
		runtimeDiff := jobmgrcommon.RuntimeDiff{
			jobmgrcommon.StateField:   task.TaskState_PENDING,
			jobmgrcommon.MessageField: "Task sent for placement",
		}
		_, instancesToRetry, err = cachedJob.PatchTasks(
			ctx,
			map[uint32]jobmgrcommon.RuntimeDiff{taskEnt.instanceID: runtimeDiff},
			false,
		)

		if err == nil && len(instancesToRetry) != 0 {
			// if the task needs to be reloaded into cache, throw error here
			// so that the action is retried by goalstate engine
			return _errTasksNotInCache
		}
	}
	return err
}