func()

in cluster-autoscaler/core/static_autoscaler.go [259:655]


func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerError {
	a.cleanUpIfRequired()
	a.processorCallbacks.reset()
	a.clusterStateRegistry.PeriodicCleanup()
	a.DebuggingSnapshotter.StartDataCollection()
	defer a.DebuggingSnapshotter.Flush()

	podLister := a.AllPodLister()
	autoscalingContext := a.AutoscalingContext

	klog.V(4).Info("Starting main loop")

	stateUpdateStart := time.Now()

	// Get nodes and pods currently living on cluster
	allNodes, readyNodes, typedErr := a.obtainNodeLists()
	if typedErr != nil {
		klog.Errorf("Failed to get node list: %v", typedErr)
		return typedErr
	}

	if abortLoop, err := a.processors.ActionableClusterProcessor.ShouldAbort(
		a.AutoscalingContext, allNodes, readyNodes, currentTime); abortLoop {
		return err
	}

	pods, err := podLister.List()
	if err != nil {
		klog.Errorf("Failed to list pods: %v", err)
		return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
	}
	originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods)
	schedulerUnprocessed := make([]*apiv1.Pod, 0, 0)
	isSchedulerProcessingIgnored := len(a.BypassedSchedulers) > 0
	if isSchedulerProcessingIgnored {
		schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods, a.BypassedSchedulers)
	}

	// Update cluster resource usage metrics
	coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)
	metrics.UpdateClusterCPUCurrentCores(coresTotal)
	metrics.UpdateClusterMemoryCurrentBytes(memoryTotal)

	daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything())
	if err != nil {
		klog.Errorf("Failed to get daemonset list: %v", err)
		return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
	}
	// Snapshot scale-down actuation status before cache refresh.
	scaleDownActuationStatus := a.scaleDownActuator.CheckStatus()
	// Call CloudProvider.Refresh before any other calls to cloud provider.
	refreshStart := time.Now()
	err = a.AutoscalingContext.CloudProvider.Refresh()
	if a.AutoscalingOptions.AsyncNodeGroupsEnabled {
		// Some node groups might have been created asynchronously, without registering in CSR.
		a.clusterStateRegistry.Recalculate()
	}
	metrics.UpdateDurationFromStart(metrics.CloudProviderRefresh, refreshStart)
	if err != nil {
		klog.Errorf("Failed to refresh cloud provider config: %v", err)
		return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
	}
	a.loopStartNotifier.Refresh()

	// Update node groups min/max and maximum number of nodes being set for all node groups after cloud provider refresh
	maxNodesCount := 0
	for _, nodeGroup := range a.AutoscalingContext.CloudProvider.NodeGroups() {
		// Don't report non-existing or upcoming node groups
		if nodeGroup.Exist() {
			metrics.UpdateNodeGroupMin(nodeGroup.Id(), nodeGroup.MinSize())
			metrics.UpdateNodeGroupMax(nodeGroup.Id(), nodeGroup.MaxSize())
			maxNodesCount += nodeGroup.MaxSize()
		}
	}
	if a.MaxNodesTotal > 0 {
		metrics.UpdateMaxNodesCount(integer.IntMin(a.MaxNodesTotal, maxNodesCount))
	} else {
		metrics.UpdateMaxNodesCount(maxNodesCount)
	}
	nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
	// Initialize cluster state to ClusterSnapshot
	if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil {
		return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
	}
	// Initialize Pod Disruption Budget tracking
	if typedErr := a.initializeRemainingPdbTracker(); typedErr != nil {
		return typedErr.AddPrefix("failed to initialize RemainingPdbTracker: ")
	}

	nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.taintConfig, currentTime)
	if autoscalerError != nil {
		klog.Errorf("Failed to get node infos for groups: %v", autoscalerError)
		return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
	}

	a.DebuggingSnapshotter.SetTemplateNodes(nodeInfosForGroups)

	if typedErr := a.updateClusterState(allNodes, nodeInfosForGroups, currentTime); typedErr != nil {
		klog.Errorf("Failed to update cluster state: %v", typedErr)
		return typedErr
	}
	metrics.UpdateDurationFromStart(metrics.UpdateState, stateUpdateStart)

	scaleUpStatus := &status.ScaleUpStatus{Result: status.ScaleUpNotTried}
	scaleUpStatusProcessorAlreadyCalled := false
	scaleDownStatus := &scaledownstatus.ScaleDownStatus{Result: scaledownstatus.ScaleDownNotTried}

	defer func() {
		// Update status information when the loop is done (regardless of reason)
		if autoscalingContext.WriteStatusConfigMap {
			status := a.clusterStateRegistry.GetStatus(currentTime)
			utils.WriteStatusConfigMap(autoscalingContext.ClientSet, autoscalingContext.ConfigNamespace,
				*status, a.AutoscalingContext.LogRecorder, a.AutoscalingContext.StatusConfigMapName, currentTime)
		}

		// This deferred processor execution allows the processors to handle a situation when a scale-(up|down)
		// wasn't even attempted because e.g. the iteration exited earlier.
		if !scaleUpStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
			a.processors.ScaleUpStatusProcessor.Process(a.AutoscalingContext, scaleUpStatus)
		}
		if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
			// Gather status before scaledown status processor invocation
			nodeDeletionResults, nodeDeletionResultsAsOf := a.scaleDownActuator.DeletionResults()
			scaleDownStatus.NodeDeleteResults = nodeDeletionResults
			scaleDownStatus.NodeDeleteResultsAsOf = nodeDeletionResultsAsOf
			a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf)
			scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)

			a.processors.ScaleDownStatusProcessor.Process(a.AutoscalingContext, scaleDownStatus)
		}

		if a.processors != nil && a.processors.AutoscalingStatusProcessor != nil {
			err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime)
			if err != nil {
				klog.Errorf("AutoscalingStatusProcessor error: %v.", err)
			}
		}
	}()

	// Check if there are any nodes that failed to register in Kubernetes
	// master.
	unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
	if len(unregisteredNodes) > 0 {
		klog.V(1).Infof("%d unregistered nodes present", len(unregisteredNodes))
		removedAny, err := a.removeOldUnregisteredNodes(unregisteredNodes,
			a.clusterStateRegistry, currentTime, autoscalingContext.LogRecorder)
		// There was a problem with removing unregistered nodes. Retry in the next loop.
		if err != nil {
			klog.Warningf("Failed to remove unregistered nodes: %v", err)
		}
		if removedAny {
			klog.V(0).Infof("Some unregistered nodes were removed")
		}
	}

	if !a.clusterStateRegistry.IsClusterHealthy() {
		klog.Warning("Cluster is not ready for autoscaling")
		a.scaleDownPlanner.CleanUpUnneededNodes()
		autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "ClusterUnhealthy", "Cluster is unhealthy")
		return nil
	}

	a.deleteCreatedNodesWithErrors()

	// Check if there has been a constant difference between the number of nodes in k8s and
	// the number of nodes on the cloud provider side.
	// TODO: andrewskim - add protection for ready AWS nodes.
	fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
	if err != nil {
		klog.Errorf("Failed to fix node group sizes: %v", err)
		return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
	}
	if fixedSomething {
		klog.V(0).Infof("Some node group target size was fixed, skipping the iteration")
		return nil
	}

	metrics.UpdateLastTime(metrics.Autoscaling, time.Now())

	// SchedulerUnprocessed might be zero here if it was disabled
	metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed))
	if isSchedulerProcessingIgnored {
		// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
		unschedulablePods = append(unschedulablePods, schedulerUnprocessed...)
	}
	// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
	upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
	// For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on
	// them and not trigger another scale-up.
	// The fake nodes are intentionally not added to the all nodes list, so that they are not considered as candidates for scale-down (which
	// doesn't make sense as they're not real).
	err = a.addUpcomingNodesToClusterSnapshot(upcomingCounts, nodeInfosForGroups)
	if err != nil {
		klog.Errorf("Failed adding upcoming nodes to cluster snapshot: %v", err)
		return caerrors.ToAutoscalerError(caerrors.InternalError, err)
	}
	// Some upcoming nodes can already be registered in the cluster, but not yet ready - we still inject replacements for them above. The actual registered nodes
	// have to be filtered out of the all nodes list so that scale-down can't consider them as candidates. Otherwise, with aggressive scale-down settings, we
	// could be removing the nodes before they have a chance to first become ready (the duration of which should be unrelated to the scale-down settings).
	var allRegisteredUpcoming []string
	for _, ngRegisteredUpcoming := range registeredUpcoming {
		allRegisteredUpcoming = append(allRegisteredUpcoming, ngRegisteredUpcoming...)
	}
	allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
	// Remove the nodes from the snapshot as well so that the state is consistent.
	for _, notStartedNodeName := range allRegisteredUpcoming {
		err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName)
		if err != nil {
			klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
			// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
			// node is not in the snapshot - so we don't have to error out in that case.
			if !errors.Is(err, clustersnapshot.ErrNodeNotFound) {
				return caerrors.ToAutoscalerError(caerrors.InternalError, err)
			}
		}
	}
	l, err := a.ClusterSnapshot.ListNodeInfos()
	if err != nil {
		klog.Errorf("Unable to fetch ClusterNode List for Debugging Snapshot, %v", err)
	} else {
		a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l)
	}

	unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)

	if err != nil {
		klog.Warningf("Failed to process unschedulable pods: %v", err)
	}

	// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
	unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
	preScaleUp := func() time.Time {
		scaleUpStart := time.Now()
		metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
		return scaleUpStart
	}

	postScaleUp := func(scaleUpStart time.Time) (bool, caerrors.AutoscalerError) {
		metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)

		if a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
			a.processors.ScaleUpStatusProcessor.Process(autoscalingContext, scaleUpStatus)
			scaleUpStatusProcessorAlreadyCalled = true
		}

		if typedErr != nil {
			klog.Errorf("Failed to scale up: %v", typedErr)
			return true, typedErr
		}
		if scaleUpStatus.Result == status.ScaleUpSuccessful {
			a.lastScaleUpTime = currentTime
			// No scale down in this iteration.
			scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown
			return true, nil
		}
		return false, nil
	}

	if len(unschedulablePodsToHelp) == 0 {
		scaleUpStatus.Result = status.ScaleUpNotNeeded
		klog.V(1).Info("No unschedulable pods")
	} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
		scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
		klog.V(1).Info("Max total nodes in cluster reached")
	} else if !isSchedulerProcessingIgnored && allPodsAreNew(unschedulablePodsToHelp, currentTime) {
		// The assumption here is that these pods have been created very recently and probably there
		// is more pods to come. In theory we could check the newest pod time but then if pod were created
		// slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time.
		// We also want to skip a real scale down (just like if the pods were handled).
		a.processorCallbacks.DisableScaleDownForLoop()
		scaleUpStatus.Result = status.ScaleUpInCooldown
		klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
	} else {
		scaleUpStart := preScaleUp()
		scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, false)
		if exit, err := postScaleUp(scaleUpStart); exit {
			return err
		}
	}

	if a.ScaleDownEnabled {
		unneededStart := time.Now()

		klog.V(4).Infof("Calculating unneeded nodes")

		var scaleDownCandidates []*apiv1.Node
		var podDestinations []*apiv1.Node

		// podDestinations and scaleDownCandidates are initialized based on allNodes variable, which contains only
		// registered nodes in cluster.
		// It does not include any upcoming nodes which can be part of clusterSnapshot. As an alternative to using
		// allNodes here, we could use nodes from clusterSnapshot and explicitly filter out upcoming nodes here but it
		// is of little (if any) benefit.

		if a.processors == nil || a.processors.ScaleDownNodeProcessor == nil {
			scaleDownCandidates = allNodes
			podDestinations = allNodes
		} else {
			var err caerrors.AutoscalerError
			scaleDownCandidates, err = a.processors.ScaleDownNodeProcessor.GetScaleDownCandidates(
				autoscalingContext, allNodes)
			if err != nil {
				klog.Error(err)
				return err
			}
			podDestinations, err = a.processors.ScaleDownNodeProcessor.GetPodDestinationCandidates(autoscalingContext, allNodes)
			if err != nil {
				klog.Error(err)
				return err
			}
		}

		typedErr := a.scaleDownPlanner.UpdateClusterState(podDestinations, scaleDownCandidates, scaleDownActuationStatus, currentTime)
		// Update clusterStateRegistry and metrics regardless of whether ScaleDown was successful or not.
		unneededNodes := a.scaleDownPlanner.UnneededNodes()
		a.processors.ScaleDownCandidatesNotifier.Update(unneededNodes, currentTime)
		metrics.UpdateUnneededNodesCount(len(unneededNodes))
		if typedErr != nil {
			scaleDownStatus.Result = scaledownstatus.ScaleDownError
			klog.Errorf("Failed to scale down: %v", typedErr)
			return typedErr
		}

		metrics.UpdateDurationFromStart(metrics.FindUnneeded, unneededStart)

		scaleDownInCooldown := a.isScaleDownInCooldown(currentTime, scaleDownCandidates)
		klog.V(4).Infof("Scale down status: lastScaleUpTime=%s lastScaleDownDeleteTime=%v "+
			"lastScaleDownFailTime=%s scaleDownForbidden=%v scaleDownInCooldown=%v",
			a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime,
			a.processorCallbacks.disableScaleDownForLoop, scaleDownInCooldown)
		metrics.UpdateScaleDownInCooldown(scaleDownInCooldown)
		// We want to delete unneeded Node Groups only if here is no current delete
		// in progress.
		_, drained := scaleDownActuationStatus.DeletionsInProgress()
		var removedNodeGroups []cloudprovider.NodeGroup
		if len(drained) == 0 {
			var err error
			removedNodeGroups, err = a.processors.NodeGroupManager.RemoveUnneededNodeGroups(autoscalingContext)
			if err != nil {
				klog.Errorf("Error while removing unneeded node groups: %v", err)
			}
			scaleDownStatus.RemovedNodeGroups = removedNodeGroups
		}

		if scaleDownInCooldown {
			scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown
		} else {
			klog.V(4).Infof("Starting scale down")

			scaleDownStart := time.Now()
			metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
			empty, needDrain := a.scaleDownPlanner.NodesToDelete(currentTime)
			scaleDownResult, scaledDownNodes, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain)
			scaleDownStatus.Result = scaleDownResult
			scaleDownStatus.ScaledDownNodes = scaledDownNodes
			metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
			metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes()))

			scaleDownStatus.RemovedNodeGroups = removedNodeGroups

			if scaleDownStatus.Result == scaledownstatus.ScaleDownNodeDeleteStarted {
				a.lastScaleDownDeleteTime = currentTime
				a.clusterStateRegistry.Recalculate()
			}

			if scaleDownStatus.Result == scaledownstatus.ScaleDownNoNodeDeleted &&
				a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 {
				taintableNodes := a.scaleDownPlanner.UnneededNodes()

				// Make sure we are only cleaning taints from selected node groups.
				selectedNodes := filterNodesFromSelectedGroups(a.CloudProvider, allNodes...)

				// This is a sanity check to make sure `taintableNodes` only includes
				// nodes from selected nodes.
				taintableNodes = intersectNodes(selectedNodes, taintableNodes)
				untaintableNodes := subtractNodes(selectedNodes, taintableNodes)
				actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes)
			}

			if typedErr != nil {
				klog.Errorf("Failed to scale down: %v", typedErr)
				a.lastScaleDownFailTime = currentTime
				return typedErr
			}
		}
	}

	if a.EnforceNodeGroupMinSize {
		scaleUpStart := preScaleUp()
		scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(readyNodes, nodeInfosForGroups)
		if exit, err := postScaleUp(scaleUpStart); exit {
			return err
		}
	}

	return nil
}