in pkg/controller/controller.go [1599:2010]
func (c *FrameworkController) syncTaskState(
f *ci.Framework, cm *core.ConfigMap,
taskRoleName string, taskIndex int32) (err error) {
logPfx := fmt.Sprintf("[%v][%v][%v]: syncTaskState: ",
f.Key(), taskRoleName, taskIndex)
klog.Infof(logPfx + "Started")
defer func() { klog.Infof(logPfx + "Completed") }()
taskRoleSpec := f.GetTaskRoleSpec(taskRoleName)
taskRoleStatus := f.TaskRoleStatus(taskRoleName)
taskStatus := f.TaskStatus(taskRoleName, taskIndex)
if taskStatus.State == ci.TaskCompleted {
// The TaskCompleted has already been considered during above
// syncFrameworkAttemptCompletionPolicy, so it is safe to skip below
// attemptToCompleteFrameworkAttempt.
//
// If the Task is DeletionPending, since it is not deleted, this must be caused
// by other Task behind it has not yet TaskCompleted.
// And if the following Task becomes TaskCompleted later, a sync will be
// enqueued to trigger the deletion.
klog.Infof(logPfx + "Skipped: Task is already completed")
return nil
}
var pod *core.Pod
if taskStatus.State != ci.TaskAttemptCompleted {
// Pod may have been creation requested successfully and may exist in remote,
// so need to sync against it.
pod, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, false)
if err != nil {
return err
}
if pod == nil {
// Avoid sync with outdated object:
// pod is remote creation requested but not found in the local cache.
if taskStatus.State == ci.TaskAttemptCreationRequested {
var diag string
var code ci.CompletionCode
if taskStatus.DeletionPending {
diag = "User has requested to delete the Task by Framework ScaleDown"
code = ci.CompletionCodeDeleteTaskRequested
klog.Info(logPfx + diag)
} else {
if c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, true) {
klog.Infof(logPfx +
"Waiting Pod to appear in the local cache or timeout")
return nil
}
diag = fmt.Sprintf(
"Pod does not appear in the local cache within timeout %v, "+
"so consider it was deleted and explicitly delete it",
common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec))
code = ci.CompletionCodePodLocalCacheCreationTimeout
klog.Warning(logPfx + diag)
}
// Ensure pod is deleted in remote to avoid managed pod leak after
// TaskAttemptCompleted.
err := c.deletePod(f, taskRoleName, taskIndex, *taskStatus.PodUID(), true, false)
if err != nil {
return err
}
c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
code.NewTaskAttemptCompletionStatus(diag, nil))
return nil
}
if taskStatus.State != ci.TaskAttemptCreationPending {
if taskStatus.AttemptStatus.CompletionStatus == nil {
diag := fmt.Sprintf("Pod was deleted by others")
klog.Warning(logPfx + diag)
c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
ci.CompletionCodePodExternalDeleted.
NewTaskAttemptCompletionStatus(diag, nil))
} else {
c.completeTaskAttempt(f, taskRoleName, taskIndex, true, nil)
}
return nil
}
} else {
if pod.DeletionTimestamp == nil {
if taskStatus.State == ci.TaskAttemptDeletionPending {
// The CompletionStatus has been persisted, so it is safe to delete the
// pod now.
err := c.deletePod(f, taskRoleName, taskIndex, *taskStatus.PodUID(), false, false)
if err != nil {
return err
}
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeletionRequested)
}
// Avoid sync with outdated object:
// pod is remote deletion requested but not deleting or deleted in the local
// cache.
if taskStatus.State == ci.TaskAttemptDeletionRequested {
// The deletion requested object will never appear again with the same UID,
// so always just wait.
klog.Infof(logPfx +
"Waiting Pod to disappearing or disappear in the local cache")
return nil
}
// At this point, taskStatus.State must be in:
// {TaskAttemptCreationRequested, TaskAttemptPreparing, TaskAttemptRunning}
if taskStatus.State == ci.TaskAttemptCreationRequested {
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing)
}
// Below Pod fields may be available even when PodPending, such as the Pod
// has been bound to a Node, but one or more Containers has not been started.
taskStatus.AttemptStatus.PodNodeName = &pod.Spec.NodeName
taskStatus.AttemptStatus.PodIP = &pod.Status.PodIP
taskStatus.AttemptStatus.PodHostIP = &pod.Status.HostIP
if pod.Status.Phase == core.PodUnknown {
// Possibly due to the NodeController has not heard from the kubelet who
// manages the Pod for more than node-monitor-grace-period but less than
// pod-eviction-timeout.
// And after pod-eviction-timeout, the Pod will be marked as deleting, but
// it will only be automatically deleted after the kubelet comes back and
// kills the Pod.
klog.Infof(logPfx+
"Waiting Pod to be deleted or deleting or transitioned from %v",
pod.Status.Phase)
} else if pod.Status.Phase == core.PodPending {
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing)
} else if pod.Status.Phase == core.PodRunning {
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptRunning)
} else if pod.Status.Phase == core.PodSucceeded {
diag := fmt.Sprintf("Pod succeeded")
klog.Info(logPfx + diag)
c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
ci.CompletionCodeSucceeded.
NewTaskAttemptCompletionStatus(
diag, ci.ExtractPodCompletionStatus(pod)))
return nil
} else if pod.Status.Phase == core.PodFailed {
result := ci.MatchCompletionCodeInfos(pod)
diag := fmt.Sprintf("Pod failed: %v", result.Diagnostics)
klog.Info(logPfx + diag)
c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
&ci.TaskAttemptCompletionStatus{
CompletionStatus: &ci.CompletionStatus{
Code: *result.CodeInfo.Code,
Phrase: result.CodeInfo.Phrase,
Type: result.CodeInfo.Type,
Diagnostics: diag,
},
Pod: ci.ExtractPodCompletionStatus(pod),
},
)
return nil
} else {
return fmt.Errorf(logPfx+
"Failed: Got unrecognized Pod Phase: %v", pod.Status.Phase)
}
} else {
if taskStatus.AttemptStatus.CompletionStatus == nil {
diag := fmt.Sprintf("Pod is being deleted by others")
klog.Warning(logPfx + diag)
taskStatus.AttemptStatus.CompletionStatus =
ci.CompletionCodePodExternalDeleted.
NewTaskAttemptCompletionStatus(diag, nil)
}
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeleting)
return c.handlePodGracefulDeletion(f, taskRoleName, taskIndex, pod)
}
}
}
// At this point, taskStatus.State must be in:
// {TaskAttemptCreationPending, TaskAttemptPreparing,
// TaskAttemptRunning, TaskAttemptCompleted}
if taskStatus.State == ci.TaskAttemptPreparing ||
taskStatus.State == ci.TaskAttemptRunning {
if taskStatus.DeletionPending {
diag := "User has requested to delete the Task by Framework ScaleDown"
klog.Info(logPfx + diag)
c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
ci.CompletionCodeDeleteTaskRequested.
NewTaskAttemptCompletionStatus(diag, nil))
}
return nil
}
// At this point, taskStatus.State must be in:
// {TaskAttemptCreationPending, TaskAttemptCompleted}
if taskStatus.State == ci.TaskAttemptCompleted {
// attemptToRetryTask
var retryDecision ci.RetryDecision
if taskRoleSpec == nil {
retryDecision = ci.RetryDecision{
ShouldRetry: false, IsAccountable: true,
DelaySec: 0, Reason: "TaskRoleSpec is already deleted"}
} else {
retryDecision = taskRoleSpec.Task.RetryPolicy.ShouldRetry(
taskStatus.RetryPolicyStatus,
taskStatus.AttemptStatus.CompletionStatus.CompletionStatus,
0, 0)
}
if taskStatus.RetryPolicyStatus.RetryDelaySec == nil {
// RetryTask is not yet scheduled, so need to be decided.
if retryDecision.ShouldRetry {
// scheduleToRetryTask
klog.Infof(logPfx+
"Will retry Task with new TaskAttempt: RetryDecision: %v",
retryDecision)
taskStatus.RetryPolicyStatus.RetryDelaySec = &retryDecision.DelaySec
} else {
// completeTask
klog.Infof(logPfx+
"Will complete Task: RetryDecision: %v",
retryDecision)
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskCompleted)
}
}
if taskStatus.RetryPolicyStatus.RetryDelaySec != nil {
// RetryTask is already scheduled, so just need to check whether it
// should be executed now.
if taskStatus.DeletionPending {
klog.Infof(logPfx +
"User has requested to delete the Task by Framework ScaleDown, " +
"so immediately retry without delay")
} else {
if c.enqueueTaskRetryDelayTimeoutCheck(f, taskRoleName, taskIndex, true) {
klog.Infof(logPfx + "Waiting Task to retry after delay")
return nil
}
}
// retryTask
klog.Info(logPfx + "Task will be retried" +
c.cConfig.LogObjectSnapshot.Task.GetLogTailOnTaskRetry(
f.MockTask(taskRoleName, taskIndex, false)))
taskStatus.RetryPolicyStatus.TotalRetriedCount++
if retryDecision.IsAccountable {
taskStatus.RetryPolicyStatus.AccountableRetriedCount++
}
taskStatus.RetryPolicyStatus.RetryDelaySec = nil
taskStatus.AttemptStatus = f.NewTaskAttemptStatus(
taskRoleName, taskIndex, taskStatus.RetryPolicyStatus.TotalRetriedCount)
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptCreationPending)
// To ensure TaskAttemptCreationPending is persisted before creating
// its pod, we need to wait until next sync to create the pod, so manually
// enqueue a sync.
c.enqueueFrameworkSync(f, "TaskAttemptCreationPending")
klog.Infof(logPfx + "Waiting TaskAttemptCreationPending to be persisted")
return nil
}
}
// At this point, taskStatus.State must be in:
// {TaskAttemptCreationPending, TaskCompleted}
if taskStatus.State == ci.TaskAttemptCreationPending {
if f.IsCompleting() {
klog.Infof(logPfx + "Skip to createTaskAttempt: " +
"FrameworkAttempt is completing")
return nil
}
if taskStatus.DeletionPending || taskRoleSpec == nil {
diag := "User has requested to delete the Task by Framework ScaleDown"
klog.Info(logPfx + diag)
// Ensure pod is deleted in remote to avoid managed pod leak after
// TaskAttemptCompleted.
_, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true)
if err != nil {
return err
}
c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
ci.CompletionCodeDeleteTaskRequested.
NewTaskAttemptCompletionStatus(diag, nil))
return nil
}
// createTaskAttempt
pod, err = c.createPod(f, cm, taskRoleName, taskIndex)
if err != nil {
apiErr := common.GetErrorCause(err)
result := ci.ClassifyPodCreationError(apiErr)
if *result.CodeInfo.Code == ci.CompletionCodePodCreationTransientError {
// Do not complete the TaskAttempt, as generally, user does not need to
// aware the Transient Error during Pod creation.
return err
}
klog.Info(logPfx + result.Diagnostics)
// Ensure pod is deleted in remote to avoid managed pod leak after
// TaskAttemptCompleted.
_, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true)
if err != nil {
return err
}
c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
result.CodeInfo.Code.
NewTaskAttemptCompletionStatus(result.Diagnostics, nil))
return nil
}
taskStatus.AttemptStatus.PodUID = &pod.UID
taskStatus.AttemptStatus.InstanceUID = ci.GetTaskAttemptInstanceUID(
taskStatus.TaskAttemptID(), taskStatus.PodUID())
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptCreationRequested)
// Informer may not deliver any event if a create is immediately followed by
// a delete, so manually enqueue a sync to check the pod existence after the
// timeout.
c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, false)
// The ground truth pod is the local cached one instead of the remote one,
// so need to wait before continue the sync.
klog.Infof(logPfx +
"Waiting Pod to appear in the local cache or timeout")
return nil
}
// At this point, taskStatus.State must be in:
// {TaskCompleted}
if taskStatus.State == ci.TaskCompleted {
if f.IsCompleting() {
klog.Infof(logPfx + "Skip to attemptToCompleteFrameworkAttempt: " +
"FrameworkAttempt is completing")
return nil
}
if taskStatus.DeletionPending || taskRoleSpec == nil {
klog.Infof(logPfx + "Skip to attemptToCompleteFrameworkAttempt: " +
"Task is DeletionPending")
// To ensure the TaskCompleted[DeletionPending] is persisted before
// deleting/replacing its Task instance, we need to wait until next
// sync to delete/replace the Task instance, so manually enqueue a sync.
c.enqueueFrameworkSync(f, "TaskCompleted[DeletionPending]")
klog.Infof(logPfx + "Waiting TaskCompleted[DeletionPending] to be persisted")
return nil
}
// attemptToCompleteFrameworkAttempt
failedTaskSelector := ci.BindIDP((*ci.TaskStatus).IsFailed, true)
succeededTaskSelector := ci.BindIDP((*ci.TaskStatus).IsSucceeded, true)
completedTaskSelector := ci.BindIDP((*ci.TaskStatus).IsCompleted, true)
completionPolicy := taskRoleSpec.FrameworkAttemptCompletionPolicy
minFailedTaskCount := completionPolicy.MinFailedTaskCount
minSucceededTaskCount := completionPolicy.MinSucceededTaskCount
var triggerCompletionStatus *ci.FrameworkAttemptCompletionStatus
if taskStatus.IsFailed(true) && minFailedTaskCount >= 1 {
failedTaskCount := taskRoleStatus.GetTaskCountStatus(failedTaskSelector)
if failedTaskCount >= minFailedTaskCount {
triggerCompletionStatus = ci.NewFailedTaskTriggeredCompletionStatus(
taskStatus, taskRoleName, failedTaskCount, minFailedTaskCount)
}
}
if taskStatus.IsSucceeded(true) && minSucceededTaskCount >= 1 {
succeededTaskCount := taskRoleStatus.GetTaskCountStatus(succeededTaskSelector)
if succeededTaskCount >= minSucceededTaskCount {
triggerCompletionStatus = ci.NewSucceededTaskTriggeredCompletionStatus(
taskStatus, taskRoleName, succeededTaskCount, minSucceededTaskCount)
}
}
if triggerCompletionStatus != nil {
klog.Info(logPfx + triggerCompletionStatus.Trigger.Message)
c.completeFrameworkAttempt(f, false, triggerCompletionStatus)
return nil
}
totalTaskCount := f.GetTotalTaskCountSpec()
// At least one completed Task is needed to trigger its
// FrameworkAttemptCompletionPolicy.
if taskStatus.IsCompleted(true) && totalTaskCount >= 1 {
completedTaskCount := f.GetTaskCountStatus(completedTaskSelector)
// The Framework must not Completing or Completed, so TaskRoles/Tasks in
// f.Spec must fully contain not DeletionPending (ScaleDown) TaskRoles/Tasks
// in f.Status, thus completedTaskCount must <= totalTaskCount.
if completedTaskCount >= totalTaskCount {
triggerCompletionStatus = ci.NewCompletedTaskTriggeredCompletionStatus(
taskStatus, taskRoleName, completedTaskCount, totalTaskCount)
klog.Info(logPfx + triggerCompletionStatus.Trigger.Message)
c.completeFrameworkAttempt(f, false, triggerCompletionStatus)
return nil
}
}
return nil
}
// At this point, taskStatus.State must be in:
// {}
// Unreachable
panic(fmt.Errorf(logPfx+
"Failed: At this point, TaskState should be in {} instead of %v",
taskStatus.State))
}