func()

in pkg/controller/controller.go [1599:2010]


func (c *FrameworkController) syncTaskState(
	f *ci.Framework, cm *core.ConfigMap,
	taskRoleName string, taskIndex int32) (err error) {
	logPfx := fmt.Sprintf("[%v][%v][%v]: syncTaskState: ",
		f.Key(), taskRoleName, taskIndex)
	klog.Infof(logPfx + "Started")
	defer func() { klog.Infof(logPfx + "Completed") }()

	taskRoleSpec := f.GetTaskRoleSpec(taskRoleName)
	taskRoleStatus := f.TaskRoleStatus(taskRoleName)
	taskStatus := f.TaskStatus(taskRoleName, taskIndex)

	if taskStatus.State == ci.TaskCompleted {
		// The TaskCompleted has already been considered during above
		// syncFrameworkAttemptCompletionPolicy, so it is safe to skip below
		// attemptToCompleteFrameworkAttempt.
		//
		// If the Task is DeletionPending, since it is not deleted, this must be caused
		// by other Task behind it has not yet TaskCompleted.
		// And if the following Task becomes TaskCompleted later, a sync will be
		// enqueued to trigger the deletion.
		klog.Infof(logPfx + "Skipped: Task is already completed")
		return nil
	}

	var pod *core.Pod
	if taskStatus.State != ci.TaskAttemptCompleted {
		// Pod may have been creation requested successfully and may exist in remote,
		// so need to sync against it.
		pod, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, false)
		if err != nil {
			return err
		}

		if pod == nil {
			// Avoid sync with outdated object:
			// pod is remote creation requested but not found in the local cache.
			if taskStatus.State == ci.TaskAttemptCreationRequested {
				var diag string
				var code ci.CompletionCode
				if taskStatus.DeletionPending {
					diag = "User has requested to delete the Task by Framework ScaleDown"
					code = ci.CompletionCodeDeleteTaskRequested
					klog.Info(logPfx + diag)
				} else {
					if c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, true) {
						klog.Infof(logPfx +
							"Waiting Pod to appear in the local cache or timeout")
						return nil
					}

					diag = fmt.Sprintf(
						"Pod does not appear in the local cache within timeout %v, "+
							"so consider it was deleted and explicitly delete it",
						common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec))
					code = ci.CompletionCodePodLocalCacheCreationTimeout
					klog.Warning(logPfx + diag)
				}

				// Ensure pod is deleted in remote to avoid managed pod leak after
				// TaskAttemptCompleted.
				err := c.deletePod(f, taskRoleName, taskIndex, *taskStatus.PodUID(), true, false)
				if err != nil {
					return err
				}

				c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
					code.NewTaskAttemptCompletionStatus(diag, nil))
				return nil
			}

			if taskStatus.State != ci.TaskAttemptCreationPending {
				if taskStatus.AttemptStatus.CompletionStatus == nil {
					diag := fmt.Sprintf("Pod was deleted by others")
					klog.Warning(logPfx + diag)
					c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
						ci.CompletionCodePodExternalDeleted.
							NewTaskAttemptCompletionStatus(diag, nil))
				} else {
					c.completeTaskAttempt(f, taskRoleName, taskIndex, true, nil)
				}

				return nil
			}
		} else {
			if pod.DeletionTimestamp == nil {
				if taskStatus.State == ci.TaskAttemptDeletionPending {
					// The CompletionStatus has been persisted, so it is safe to delete the
					// pod now.
					err := c.deletePod(f, taskRoleName, taskIndex, *taskStatus.PodUID(), false, false)
					if err != nil {
						return err
					}
					f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeletionRequested)
				}

				// Avoid sync with outdated object:
				// pod is remote deletion requested but not deleting or deleted in the local
				// cache.
				if taskStatus.State == ci.TaskAttemptDeletionRequested {
					// The deletion requested object will never appear again with the same UID,
					// so always just wait.
					klog.Infof(logPfx +
						"Waiting Pod to disappearing or disappear in the local cache")
					return nil
				}

				// At this point, taskStatus.State must be in:
				// {TaskAttemptCreationRequested, TaskAttemptPreparing, TaskAttemptRunning}
				if taskStatus.State == ci.TaskAttemptCreationRequested {
					f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing)
				}

				// Below Pod fields may be available even when PodPending, such as the Pod
				// has been bound to a Node, but one or more Containers has not been started.
				taskStatus.AttemptStatus.PodNodeName = &pod.Spec.NodeName
				taskStatus.AttemptStatus.PodIP = &pod.Status.PodIP
				taskStatus.AttemptStatus.PodHostIP = &pod.Status.HostIP

				if pod.Status.Phase == core.PodUnknown {
					// Possibly due to the NodeController has not heard from the kubelet who
					// manages the Pod for more than node-monitor-grace-period but less than
					// pod-eviction-timeout.
					// And after pod-eviction-timeout, the Pod will be marked as deleting, but
					// it will only be automatically deleted after the kubelet comes back and
					// kills the Pod.
					klog.Infof(logPfx+
						"Waiting Pod to be deleted or deleting or transitioned from %v",
						pod.Status.Phase)
				} else if pod.Status.Phase == core.PodPending {
					f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing)
				} else if pod.Status.Phase == core.PodRunning {
					f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptRunning)
				} else if pod.Status.Phase == core.PodSucceeded {
					diag := fmt.Sprintf("Pod succeeded")
					klog.Info(logPfx + diag)
					c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
						ci.CompletionCodeSucceeded.
							NewTaskAttemptCompletionStatus(
								diag, ci.ExtractPodCompletionStatus(pod)))
					return nil
				} else if pod.Status.Phase == core.PodFailed {
					result := ci.MatchCompletionCodeInfos(pod)
					diag := fmt.Sprintf("Pod failed: %v", result.Diagnostics)
					klog.Info(logPfx + diag)
					c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
						&ci.TaskAttemptCompletionStatus{
							CompletionStatus: &ci.CompletionStatus{
								Code:        *result.CodeInfo.Code,
								Phrase:      result.CodeInfo.Phrase,
								Type:        result.CodeInfo.Type,
								Diagnostics: diag,
							},
							Pod: ci.ExtractPodCompletionStatus(pod),
						},
					)
					return nil
				} else {
					return fmt.Errorf(logPfx+
						"Failed: Got unrecognized Pod Phase: %v", pod.Status.Phase)
				}
			} else {
				if taskStatus.AttemptStatus.CompletionStatus == nil {
					diag := fmt.Sprintf("Pod is being deleted by others")
					klog.Warning(logPfx + diag)
					taskStatus.AttemptStatus.CompletionStatus =
						ci.CompletionCodePodExternalDeleted.
							NewTaskAttemptCompletionStatus(diag, nil)
				}

				f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeleting)
				return c.handlePodGracefulDeletion(f, taskRoleName, taskIndex, pod)
			}
		}
	}
	// At this point, taskStatus.State must be in:
	// {TaskAttemptCreationPending, TaskAttemptPreparing,
	// TaskAttemptRunning, TaskAttemptCompleted}

	if taskStatus.State == ci.TaskAttemptPreparing ||
		taskStatus.State == ci.TaskAttemptRunning {
		if taskStatus.DeletionPending {
			diag := "User has requested to delete the Task by Framework ScaleDown"
			klog.Info(logPfx + diag)
			c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
				ci.CompletionCodeDeleteTaskRequested.
					NewTaskAttemptCompletionStatus(diag, nil))
		}
		return nil
	}
	// At this point, taskStatus.State must be in:
	// {TaskAttemptCreationPending, TaskAttemptCompleted}

	if taskStatus.State == ci.TaskAttemptCompleted {
		// attemptToRetryTask
		var retryDecision ci.RetryDecision
		if taskRoleSpec == nil {
			retryDecision = ci.RetryDecision{
				ShouldRetry: false, IsAccountable: true,
				DelaySec: 0, Reason: "TaskRoleSpec is already deleted"}
		} else {
			retryDecision = taskRoleSpec.Task.RetryPolicy.ShouldRetry(
				taskStatus.RetryPolicyStatus,
				taskStatus.AttemptStatus.CompletionStatus.CompletionStatus,
				0, 0)
		}

		if taskStatus.RetryPolicyStatus.RetryDelaySec == nil {
			// RetryTask is not yet scheduled, so need to be decided.
			if retryDecision.ShouldRetry {
				// scheduleToRetryTask
				klog.Infof(logPfx+
					"Will retry Task with new TaskAttempt: RetryDecision: %v",
					retryDecision)

				taskStatus.RetryPolicyStatus.RetryDelaySec = &retryDecision.DelaySec
			} else {
				// completeTask
				klog.Infof(logPfx+
					"Will complete Task: RetryDecision: %v",
					retryDecision)

				f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskCompleted)
			}
		}

		if taskStatus.RetryPolicyStatus.RetryDelaySec != nil {
			// RetryTask is already scheduled, so just need to check whether it
			// should be executed now.
			if taskStatus.DeletionPending {
				klog.Infof(logPfx +
					"User has requested to delete the Task by Framework ScaleDown, " +
					"so immediately retry without delay")
			} else {
				if c.enqueueTaskRetryDelayTimeoutCheck(f, taskRoleName, taskIndex, true) {
					klog.Infof(logPfx + "Waiting Task to retry after delay")
					return nil
				}
			}

			// retryTask
			klog.Info(logPfx + "Task will be retried" +
				c.cConfig.LogObjectSnapshot.Task.GetLogTailOnTaskRetry(
					f.MockTask(taskRoleName, taskIndex, false)))

			taskStatus.RetryPolicyStatus.TotalRetriedCount++
			if retryDecision.IsAccountable {
				taskStatus.RetryPolicyStatus.AccountableRetriedCount++
			}
			taskStatus.RetryPolicyStatus.RetryDelaySec = nil
			taskStatus.AttemptStatus = f.NewTaskAttemptStatus(
				taskRoleName, taskIndex, taskStatus.RetryPolicyStatus.TotalRetriedCount)
			f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptCreationPending)

			// To ensure TaskAttemptCreationPending is persisted before creating
			// its pod, we need to wait until next sync to create the pod, so manually
			// enqueue a sync.
			c.enqueueFrameworkSync(f, "TaskAttemptCreationPending")
			klog.Infof(logPfx + "Waiting TaskAttemptCreationPending to be persisted")
			return nil
		}
	}
	// At this point, taskStatus.State must be in:
	// {TaskAttemptCreationPending, TaskCompleted}

	if taskStatus.State == ci.TaskAttemptCreationPending {
		if f.IsCompleting() {
			klog.Infof(logPfx + "Skip to createTaskAttempt: " +
				"FrameworkAttempt is completing")
			return nil
		}

		if taskStatus.DeletionPending || taskRoleSpec == nil {
			diag := "User has requested to delete the Task by Framework ScaleDown"
			klog.Info(logPfx + diag)

			// Ensure pod is deleted in remote to avoid managed pod leak after
			// TaskAttemptCompleted.
			_, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true)
			if err != nil {
				return err
			}

			c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
				ci.CompletionCodeDeleteTaskRequested.
					NewTaskAttemptCompletionStatus(diag, nil))
			return nil
		}

		// createTaskAttempt
		pod, err = c.createPod(f, cm, taskRoleName, taskIndex)
		if err != nil {
			apiErr := common.GetErrorCause(err)
			result := ci.ClassifyPodCreationError(apiErr)
			if *result.CodeInfo.Code == ci.CompletionCodePodCreationTransientError {
				// Do not complete the TaskAttempt, as generally, user does not need to
				// aware the Transient Error during Pod creation.
				return err
			}

			klog.Info(logPfx + result.Diagnostics)
			// Ensure pod is deleted in remote to avoid managed pod leak after
			// TaskAttemptCompleted.
			_, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true)
			if err != nil {
				return err
			}

			c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
				result.CodeInfo.Code.
					NewTaskAttemptCompletionStatus(result.Diagnostics, nil))
			return nil
		}

		taskStatus.AttemptStatus.PodUID = &pod.UID
		taskStatus.AttemptStatus.InstanceUID = ci.GetTaskAttemptInstanceUID(
			taskStatus.TaskAttemptID(), taskStatus.PodUID())
		f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptCreationRequested)

		// Informer may not deliver any event if a create is immediately followed by
		// a delete, so manually enqueue a sync to check the pod existence after the
		// timeout.
		c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, false)

		// The ground truth pod is the local cached one instead of the remote one,
		// so need to wait before continue the sync.
		klog.Infof(logPfx +
			"Waiting Pod to appear in the local cache or timeout")
		return nil
	}
	// At this point, taskStatus.State must be in:
	// {TaskCompleted}

	if taskStatus.State == ci.TaskCompleted {
		if f.IsCompleting() {
			klog.Infof(logPfx + "Skip to attemptToCompleteFrameworkAttempt: " +
				"FrameworkAttempt is completing")
			return nil
		}

		if taskStatus.DeletionPending || taskRoleSpec == nil {
			klog.Infof(logPfx + "Skip to attemptToCompleteFrameworkAttempt: " +
				"Task is DeletionPending")

			// To ensure the TaskCompleted[DeletionPending] is persisted before
			// deleting/replacing its Task instance, we need to wait until next
			// sync to delete/replace the Task instance, so manually enqueue a sync.
			c.enqueueFrameworkSync(f, "TaskCompleted[DeletionPending]")
			klog.Infof(logPfx + "Waiting TaskCompleted[DeletionPending] to be persisted")
			return nil
		}

		// attemptToCompleteFrameworkAttempt
		failedTaskSelector := ci.BindIDP((*ci.TaskStatus).IsFailed, true)
		succeededTaskSelector := ci.BindIDP((*ci.TaskStatus).IsSucceeded, true)
		completedTaskSelector := ci.BindIDP((*ci.TaskStatus).IsCompleted, true)

		completionPolicy := taskRoleSpec.FrameworkAttemptCompletionPolicy
		minFailedTaskCount := completionPolicy.MinFailedTaskCount
		minSucceededTaskCount := completionPolicy.MinSucceededTaskCount

		var triggerCompletionStatus *ci.FrameworkAttemptCompletionStatus
		if taskStatus.IsFailed(true) && minFailedTaskCount >= 1 {
			failedTaskCount := taskRoleStatus.GetTaskCountStatus(failedTaskSelector)
			if failedTaskCount >= minFailedTaskCount {
				triggerCompletionStatus = ci.NewFailedTaskTriggeredCompletionStatus(
					taskStatus, taskRoleName, failedTaskCount, minFailedTaskCount)
			}
		}

		if taskStatus.IsSucceeded(true) && minSucceededTaskCount >= 1 {
			succeededTaskCount := taskRoleStatus.GetTaskCountStatus(succeededTaskSelector)
			if succeededTaskCount >= minSucceededTaskCount {
				triggerCompletionStatus = ci.NewSucceededTaskTriggeredCompletionStatus(
					taskStatus, taskRoleName, succeededTaskCount, minSucceededTaskCount)
			}
		}

		if triggerCompletionStatus != nil {
			klog.Info(logPfx + triggerCompletionStatus.Trigger.Message)
			c.completeFrameworkAttempt(f, false, triggerCompletionStatus)
			return nil
		}

		totalTaskCount := f.GetTotalTaskCountSpec()
		// At least one completed Task is needed to trigger its
		// FrameworkAttemptCompletionPolicy.
		if taskStatus.IsCompleted(true) && totalTaskCount >= 1 {
			completedTaskCount := f.GetTaskCountStatus(completedTaskSelector)
			// The Framework must not Completing or Completed, so TaskRoles/Tasks in
			// f.Spec must fully contain not DeletionPending (ScaleDown) TaskRoles/Tasks
			// in f.Status, thus completedTaskCount must <= totalTaskCount.
			if completedTaskCount >= totalTaskCount {
				triggerCompletionStatus = ci.NewCompletedTaskTriggeredCompletionStatus(
					taskStatus, taskRoleName, completedTaskCount, totalTaskCount)

				klog.Info(logPfx + triggerCompletionStatus.Trigger.Message)
				c.completeFrameworkAttempt(f, false, triggerCompletionStatus)
				return nil
			}
		}

		return nil
	}
	// At this point, taskStatus.State must be in:
	// {}

	// Unreachable
	panic(fmt.Errorf(logPfx+
		"Failed: At this point, TaskState should be in {} instead of %v",
		taskStatus.State))
}