in pkg/controller/controller.go [990:1283]
func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
logPfx := fmt.Sprintf("[%v]: syncFrameworkState: ", f.Key())
klog.Infof(logPfx + "Started")
defer func() { klog.Infof(logPfx + "Completed") }()
if f.Status.State == ci.FrameworkCompleted {
if c.enqueueFrameworkCompletedRetainTimeoutCheck(f, true) {
klog.Infof(logPfx+"Skipped: Framework is already %v, "+
"and waiting to be deleted after FrameworkCompletedRetainSec",
f.Status.State)
return nil
}
// deleteFramework
klog.Info(logPfx + fmt.Sprintf("Framework will be deleted due to "+
"FrameworkCompletedRetainSec %v is expired",
common.SecToDuration(c.cConfig.FrameworkCompletedRetainSec)) +
c.cConfig.LogObjectSnapshot.Framework.GetLogTailOnFrameworkDeletion(f))
return c.deleteFramework(f, true)
}
var cm *core.ConfigMap
if f.Status.State != ci.FrameworkAttemptCompleted {
// ConfigMap may have been creation requested successfully and may exist in
// remote, so need to sync against it.
cm, err = c.getOrCleanupConfigMap(f, false)
if err != nil {
return err
}
if cm == nil {
// Avoid sync with outdated object:
// cm is remote creation requested but not found in the local cache.
if f.Status.State == ci.FrameworkAttemptCreationRequested {
var diag string
var code ci.CompletionCode
if f.Spec.ExecutionType == ci.ExecutionStop {
diag = "User has requested to stop the Framework"
code = ci.CompletionCodeStopFrameworkRequested
klog.Info(logPfx + diag)
} else {
if c.enqueueFrameworkAttemptCreationTimeoutCheck(f, true) {
klog.Infof(logPfx +
"Waiting ConfigMap to appear in the local cache or timeout")
return nil
}
diag = fmt.Sprintf(
"ConfigMap does not appear in the local cache within timeout %v, "+
"so consider it was deleted and explicitly delete it",
common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec))
code = ci.CompletionCodeConfigMapLocalCacheCreationTimeout
klog.Warning(logPfx + diag)
}
// Ensure cm is deleted in remote to avoid managed cm leak after
// FrameworkAttemptCompleted.
err := c.deleteConfigMap(f, *f.ConfigMapUID(), true)
if err != nil {
return err
}
c.completeFrameworkAttempt(f, true,
code.NewFrameworkAttemptCompletionStatus(diag, nil))
return nil
}
if f.Status.State != ci.FrameworkAttemptCreationPending {
if f.Status.AttemptStatus.CompletionStatus == nil {
diag := fmt.Sprintf("ConfigMap was deleted by others")
klog.Warning(logPfx + diag)
c.completeFrameworkAttempt(f, true,
ci.CompletionCodeConfigMapExternalDeleted.
NewFrameworkAttemptCompletionStatus(diag, nil))
} else {
c.completeFrameworkAttempt(f, true, nil)
}
return nil
}
} else {
if cm.DeletionTimestamp == nil {
if f.Status.State == ci.FrameworkAttemptDeletionPending {
// The CompletionStatus has been persisted, so it is safe to delete the
// cm now.
err := c.deleteConfigMap(f, *f.ConfigMapUID(), false)
if err != nil {
return err
}
f.TransitionFrameworkState(ci.FrameworkAttemptDeletionRequested)
}
// Avoid sync with outdated object:
// cm is remote deletion requested but not deleting or deleted in the local
// cache.
if f.Status.State == ci.FrameworkAttemptDeletionRequested {
// The deletion requested object will never appear again with the same UID,
// so always just wait.
klog.Infof(logPfx +
"Waiting ConfigMap to disappearing or disappear in the local cache")
} else {
// At this point, f.Status.State must be in:
// {FrameworkAttemptCreationRequested, FrameworkAttemptPreparing,
// FrameworkAttemptRunning}
if f.Status.State == ci.FrameworkAttemptCreationRequested {
f.TransitionFrameworkState(ci.FrameworkAttemptPreparing)
}
}
} else {
if f.Status.AttemptStatus.CompletionStatus == nil {
diag := fmt.Sprintf("ConfigMap is being deleted by others")
klog.Warning(logPfx + diag)
f.Status.AttemptStatus.CompletionStatus =
ci.CompletionCodeConfigMapExternalDeleted.
NewFrameworkAttemptCompletionStatus(diag, nil)
}
f.TransitionFrameworkState(ci.FrameworkAttemptDeleting)
klog.Infof(logPfx + "Waiting ConfigMap to be deleted")
}
}
}
// At this point, f.Status.State must be in:
// {FrameworkAttemptCreationPending, FrameworkAttemptPreparing,
// FrameworkAttemptRunning, FrameworkAttemptDeletionRequested,
// FrameworkAttemptDeleting, FrameworkAttemptCompleted}
if f.Status.State == ci.FrameworkAttemptCompleted {
// attemptToRetryFramework
retryDecision := f.Spec.RetryPolicy.ShouldRetry(
f.Status.RetryPolicyStatus,
f.Status.AttemptStatus.CompletionStatus.CompletionStatus,
*c.cConfig.FrameworkMinRetryDelaySecForTransientConflictFailed,
*c.cConfig.FrameworkMaxRetryDelaySecForTransientConflictFailed)
if f.Status.RetryPolicyStatus.RetryDelaySec == nil {
// RetryFramework is not yet scheduled, so need to be decided.
if retryDecision.ShouldRetry {
// scheduleToRetryFramework
klog.Infof(logPfx+
"Will retry Framework with new FrameworkAttempt: RetryDecision: %v",
retryDecision)
f.Status.RetryPolicyStatus.RetryDelaySec = &retryDecision.DelaySec
} else {
// completeFramework
klog.Infof(logPfx+
"Will complete Framework: RetryDecision: %v",
retryDecision)
f.TransitionFrameworkState(ci.FrameworkCompleted)
c.enqueueFrameworkCompletedRetainTimeoutCheck(f, false)
klog.Infof(logPfx +
"Waiting Framework to be deleted after FrameworkCompletedRetainSec")
return nil
}
}
if f.Status.RetryPolicyStatus.RetryDelaySec != nil {
// RetryFramework is already scheduled, so just need to check whether it
// should be executed now.
if f.Spec.ExecutionType == ci.ExecutionStop {
klog.Infof(logPfx +
"User has requested to stop the Framework, " +
"so immediately retry without delay")
} else {
if c.enqueueFrameworkRetryDelayTimeoutCheck(f, true) {
klog.Infof(logPfx + "Waiting Framework to retry after delay")
return nil
}
}
// retryFramework
klog.Info(logPfx + "Framework will be retried" +
c.cConfig.LogObjectSnapshot.Framework.GetLogTailOnFrameworkRetry(f))
f.Status.RetryPolicyStatus.TotalRetriedCount++
if retryDecision.IsAccountable {
f.Status.RetryPolicyStatus.AccountableRetriedCount++
}
f.Status.RetryPolicyStatus.RetryDelaySec = nil
f.Status.AttemptStatus = f.NewFrameworkAttemptStatus(
f.Status.RetryPolicyStatus.TotalRetriedCount)
f.TransitionFrameworkState(ci.FrameworkAttemptCreationPending)
// To ensure FrameworkAttemptCreationPending is persisted before creating
// its cm, we need to wait until next sync to create the cm, so manually
// enqueue a sync.
c.enqueueFrameworkSync(f, "FrameworkAttemptCreationPending")
klog.Infof(logPfx + "Waiting FrameworkAttemptCreationPending to be persisted")
return nil
}
}
// At this point, f.Status.State must be in:
// {FrameworkAttemptCreationPending, FrameworkAttemptPreparing,
// FrameworkAttemptRunning, FrameworkAttemptDeletionRequested,
// FrameworkAttemptDeleting}
if f.Status.State == ci.FrameworkAttemptCreationPending {
if f.DeletionTimestamp != nil {
klog.Infof(logPfx + "Skip to createFrameworkAttempt: " +
"Framework is deleting")
return nil
}
if f.Spec.ExecutionType == ci.ExecutionCreate {
klog.Infof(logPfx + "Skip to createFrameworkAttempt: " +
"User has requested to just create the Framework without starting it")
return nil
}
if f.Spec.ExecutionType == ci.ExecutionStop {
diag := "User has requested to stop the Framework"
klog.Info(logPfx + diag)
// Ensure cm is deleted in remote to avoid managed cm leak after
// FrameworkAttemptCompleted.
_, err = c.getOrCleanupConfigMap(f, true)
if err != nil {
return err
}
c.completeFrameworkAttempt(f, true,
ci.CompletionCodeStopFrameworkRequested.
NewFrameworkAttemptCompletionStatus(diag, nil))
return nil
}
// createFrameworkAttempt
cm, err = c.createConfigMap(f)
if err != nil {
return err
}
f.Status.AttemptStatus.ConfigMapUID = &cm.UID
f.Status.AttemptStatus.InstanceUID = ci.GetFrameworkAttemptInstanceUID(
f.FrameworkAttemptID(), f.ConfigMapUID())
f.TransitionFrameworkState(ci.FrameworkAttemptCreationRequested)
// Informer may not deliver any event if a create is immediately followed by
// a delete, so manually enqueue a sync to check the cm existence after the
// timeout.
c.enqueueFrameworkAttemptCreationTimeoutCheck(f, false)
// The ground truth cm is the local cached one instead of the remote one,
// so need to wait before continue the sync.
klog.Infof(logPfx +
"Waiting ConfigMap to appear in the local cache or timeout")
return nil
}
// At this point, f.Status.State must be in:
// {FrameworkAttemptPreparing, FrameworkAttemptRunning,
// FrameworkAttemptDeletionRequested, FrameworkAttemptDeleting}
if f.Status.State == ci.FrameworkAttemptPreparing ||
f.Status.State == ci.FrameworkAttemptRunning ||
f.Status.State == ci.FrameworkAttemptDeletionRequested ||
f.Status.State == ci.FrameworkAttemptDeleting {
if !f.IsCompleting() {
if f.Spec.ExecutionType == ci.ExecutionStop {
diag := "User has requested to stop the Framework"
klog.Info(logPfx + diag)
c.completeFrameworkAttempt(f, false,
ci.CompletionCodeStopFrameworkRequested.
NewFrameworkAttemptCompletionStatus(diag, nil))
}
}
if !f.IsCompleting() {
c.syncFrameworkAttemptCompletionPolicy(f)
}
err := c.syncTaskRoleStatuses(f, cm)
if f.Status.State == ci.FrameworkAttemptPreparing {
if f.IsAnyTaskRunning(true) {
f.TransitionFrameworkState(ci.FrameworkAttemptRunning)
}
}
return err
} else {
// Unreachable
panic(fmt.Errorf(logPfx+
"Failed: At this point, FrameworkState should be in "+
"{%v, %v, %v, %v} instead of %v",
ci.FrameworkAttemptPreparing, ci.FrameworkAttemptRunning,
ci.FrameworkAttemptDeletionRequested, ci.FrameworkAttemptDeleting,
f.Status.State))
}
}