in pkg/controller/controller.go [752:833]
func (c *FrameworkController) syncFrameworkScale(
f *ci.Framework) (producedNewPendingTask bool) {
logPfx := fmt.Sprintf("[%v]: syncFrameworkScale: ", f.Key())
klog.Infof(logPfx + "Started")
defer func() { klog.Infof(logPfx + "Completed") }()
producedNewPendingTask = false
// No longer react to Rescale after the whole FrameworkAttempt Completing,
// to ensure DeletionPending (ScaleDown) Task will never trigger (impact)
// Framework/FrameworkAttempt completion.
if f.IsCompleting() ||
f.Status.State == ci.FrameworkAttemptCompleted ||
f.Status.State == ci.FrameworkCompleted {
klog.Infof(logPfx+"Skipped: Framework is already %v", f.Status.State)
return producedNewPendingTask
}
for _, taskRoleSpec := range f.Spec.TaskRoles {
taskRoleName := taskRoleSpec.Name
taskCountSpec := taskRoleSpec.TaskNumber
taskRoleStatus := f.GetTaskRoleStatus(taskRoleName)
if taskRoleStatus == nil {
// ScaleUp: Directly add TaskRole that need to bring up.
klog.Infof("[%v][%v]: syncFrameworkScale: ScaleUp: Goal: %v -> %v",
f.Key(), taskRoleName, nil, taskCountSpec)
f.Status.AttemptStatus.TaskRoleStatuses =
append(f.Status.AttemptStatus.TaskRoleStatuses,
f.NewTaskRoleStatus(taskRoleName, taskCountSpec))
if taskCountSpec > 0 {
producedNewPendingTask = true
}
} else {
taskCountStatus := int32(len(taskRoleStatus.TaskStatuses))
if taskCountStatus < taskCountSpec {
// ScaleUp: Directly add Task that need to bring up.
klog.Infof("[%v][%v]: syncFrameworkScale: ScaleUp: Goal: %v -> %v",
f.Key(), taskRoleName, taskCountStatus, taskCountSpec)
for taskIndex := taskCountStatus; taskIndex < taskCountSpec; taskIndex++ {
taskRoleStatus.TaskStatuses =
append(taskRoleStatus.TaskStatuses, f.NewTaskStatus(taskRoleName, taskIndex))
producedNewPendingTask = true
}
} else if taskCountStatus > taskCountSpec {
// ScaleDown: Just mark Task that need to bring down as DeletionPending.
klog.Infof("[%v][%v]: syncFrameworkScale: ScaleDown: Goal: %v -> %v",
f.Key(), taskRoleName, taskCountStatus, taskCountSpec)
for taskIndex := taskCountStatus - 1; taskIndex >= taskCountSpec; taskIndex-- {
taskStatus := taskRoleStatus.TaskStatuses[taskIndex]
if taskStatus.MarkAsDeletionPending() {
producedNewPendingTask = true
}
}
}
}
}
for _, taskRoleStatus := range f.TaskRoleStatuses() {
taskRoleName := taskRoleStatus.Name
taskCountStatus := int32(len(taskRoleStatus.TaskStatuses))
taskRoleSpec := f.GetTaskRoleSpec(taskRoleName)
if taskRoleSpec == nil {
// ScaleDown: Just mark Task that need to bring down as DeletionPending.
klog.Infof("[%v][%v]: syncFrameworkScale: ScaleDown: Goal: %v -> %v",
f.Key(), taskRoleName, taskCountStatus, nil)
for taskIndex := taskCountStatus - 1; taskIndex >= 0; taskIndex-- {
taskStatus := taskRoleStatus.TaskStatuses[taskIndex]
if taskStatus.MarkAsDeletionPending() {
producedNewPendingTask = true
}
}
}
}
return producedNewPendingTask
}