func()

in pkg/controller/controller.go [752:833]


func (c *FrameworkController) syncFrameworkScale(
	f *ci.Framework) (producedNewPendingTask bool) {
	logPfx := fmt.Sprintf("[%v]: syncFrameworkScale: ", f.Key())
	klog.Infof(logPfx + "Started")
	defer func() { klog.Infof(logPfx + "Completed") }()

	producedNewPendingTask = false

	// No longer react to Rescale after the whole FrameworkAttempt Completing,
	// to ensure DeletionPending (ScaleDown) Task will never trigger (impact)
	// Framework/FrameworkAttempt completion.
	if f.IsCompleting() ||
		f.Status.State == ci.FrameworkAttemptCompleted ||
		f.Status.State == ci.FrameworkCompleted {
		klog.Infof(logPfx+"Skipped: Framework is already %v", f.Status.State)
		return producedNewPendingTask
	}

	for _, taskRoleSpec := range f.Spec.TaskRoles {
		taskRoleName := taskRoleSpec.Name
		taskCountSpec := taskRoleSpec.TaskNumber
		taskRoleStatus := f.GetTaskRoleStatus(taskRoleName)

		if taskRoleStatus == nil {
			// ScaleUp: Directly add TaskRole that need to bring up.
			klog.Infof("[%v][%v]: syncFrameworkScale: ScaleUp: Goal: %v -> %v",
				f.Key(), taskRoleName, nil, taskCountSpec)

			f.Status.AttemptStatus.TaskRoleStatuses =
				append(f.Status.AttemptStatus.TaskRoleStatuses,
					f.NewTaskRoleStatus(taskRoleName, taskCountSpec))
			if taskCountSpec > 0 {
				producedNewPendingTask = true
			}
		} else {
			taskCountStatus := int32(len(taskRoleStatus.TaskStatuses))
			if taskCountStatus < taskCountSpec {
				// ScaleUp: Directly add Task that need to bring up.
				klog.Infof("[%v][%v]: syncFrameworkScale: ScaleUp: Goal: %v -> %v",
					f.Key(), taskRoleName, taskCountStatus, taskCountSpec)

				for taskIndex := taskCountStatus; taskIndex < taskCountSpec; taskIndex++ {
					taskRoleStatus.TaskStatuses =
						append(taskRoleStatus.TaskStatuses, f.NewTaskStatus(taskRoleName, taskIndex))
					producedNewPendingTask = true
				}
			} else if taskCountStatus > taskCountSpec {
				// ScaleDown: Just mark Task that need to bring down as DeletionPending.
				klog.Infof("[%v][%v]: syncFrameworkScale: ScaleDown: Goal: %v -> %v",
					f.Key(), taskRoleName, taskCountStatus, taskCountSpec)

				for taskIndex := taskCountStatus - 1; taskIndex >= taskCountSpec; taskIndex-- {
					taskStatus := taskRoleStatus.TaskStatuses[taskIndex]
					if taskStatus.MarkAsDeletionPending() {
						producedNewPendingTask = true
					}
				}
			}
		}
	}

	for _, taskRoleStatus := range f.TaskRoleStatuses() {
		taskRoleName := taskRoleStatus.Name
		taskCountStatus := int32(len(taskRoleStatus.TaskStatuses))
		taskRoleSpec := f.GetTaskRoleSpec(taskRoleName)

		if taskRoleSpec == nil {
			// ScaleDown: Just mark Task that need to bring down as DeletionPending.
			klog.Infof("[%v][%v]: syncFrameworkScale: ScaleDown: Goal: %v -> %v",
				f.Key(), taskRoleName, taskCountStatus, nil)

			for taskIndex := taskCountStatus - 1; taskIndex >= 0; taskIndex-- {
				taskStatus := taskRoleStatus.TaskStatuses[taskIndex]
				if taskStatus.MarkAsDeletionPending() {
					producedNewPendingTask = true
				}
			}
		}
	}

	return producedNewPendingTask
}