in cluster-autoscaler/core/static_autoscaler.go [259:655]
func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerError {
a.cleanUpIfRequired()
a.processorCallbacks.reset()
a.clusterStateRegistry.PeriodicCleanup()
a.DebuggingSnapshotter.StartDataCollection()
defer a.DebuggingSnapshotter.Flush()
podLister := a.AllPodLister()
autoscalingContext := a.AutoscalingContext
klog.V(4).Info("Starting main loop")
stateUpdateStart := time.Now()
// Get nodes and pods currently living on cluster
allNodes, readyNodes, typedErr := a.obtainNodeLists()
if typedErr != nil {
klog.Errorf("Failed to get node list: %v", typedErr)
return typedErr
}
if abortLoop, err := a.processors.ActionableClusterProcessor.ShouldAbort(
a.AutoscalingContext, allNodes, readyNodes, currentTime); abortLoop {
return err
}
pods, err := podLister.List()
if err != nil {
klog.Errorf("Failed to list pods: %v", err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods)
schedulerUnprocessed := make([]*apiv1.Pod, 0, 0)
isSchedulerProcessingIgnored := len(a.BypassedSchedulers) > 0
if isSchedulerProcessingIgnored {
schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods, a.BypassedSchedulers)
}
// Update cluster resource usage metrics
coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)
metrics.UpdateClusterCPUCurrentCores(coresTotal)
metrics.UpdateClusterMemoryCurrentBytes(memoryTotal)
daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything())
if err != nil {
klog.Errorf("Failed to get daemonset list: %v", err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
// Snapshot scale-down actuation status before cache refresh.
scaleDownActuationStatus := a.scaleDownActuator.CheckStatus()
// Call CloudProvider.Refresh before any other calls to cloud provider.
refreshStart := time.Now()
err = a.AutoscalingContext.CloudProvider.Refresh()
if a.AutoscalingOptions.AsyncNodeGroupsEnabled {
// Some node groups might have been created asynchronously, without registering in CSR.
a.clusterStateRegistry.Recalculate()
}
metrics.UpdateDurationFromStart(metrics.CloudProviderRefresh, refreshStart)
if err != nil {
klog.Errorf("Failed to refresh cloud provider config: %v", err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
}
a.loopStartNotifier.Refresh()
// Update node groups min/max and maximum number of nodes being set for all node groups after cloud provider refresh
maxNodesCount := 0
for _, nodeGroup := range a.AutoscalingContext.CloudProvider.NodeGroups() {
// Don't report non-existing or upcoming node groups
if nodeGroup.Exist() {
metrics.UpdateNodeGroupMin(nodeGroup.Id(), nodeGroup.MinSize())
metrics.UpdateNodeGroupMax(nodeGroup.Id(), nodeGroup.MaxSize())
maxNodesCount += nodeGroup.MaxSize()
}
}
if a.MaxNodesTotal > 0 {
metrics.UpdateMaxNodesCount(integer.IntMin(a.MaxNodesTotal, maxNodesCount))
} else {
metrics.UpdateMaxNodesCount(maxNodesCount)
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
// Initialize cluster state to ClusterSnapshot
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
}
// Initialize Pod Disruption Budget tracking
if typedErr := a.initializeRemainingPdbTracker(); typedErr != nil {
return typedErr.AddPrefix("failed to initialize RemainingPdbTracker: ")
}
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.taintConfig, currentTime)
if autoscalerError != nil {
klog.Errorf("Failed to get node infos for groups: %v", autoscalerError)
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
}
a.DebuggingSnapshotter.SetTemplateNodes(nodeInfosForGroups)
if typedErr := a.updateClusterState(allNodes, nodeInfosForGroups, currentTime); typedErr != nil {
klog.Errorf("Failed to update cluster state: %v", typedErr)
return typedErr
}
metrics.UpdateDurationFromStart(metrics.UpdateState, stateUpdateStart)
scaleUpStatus := &status.ScaleUpStatus{Result: status.ScaleUpNotTried}
scaleUpStatusProcessorAlreadyCalled := false
scaleDownStatus := &scaledownstatus.ScaleDownStatus{Result: scaledownstatus.ScaleDownNotTried}
defer func() {
// Update status information when the loop is done (regardless of reason)
if autoscalingContext.WriteStatusConfigMap {
status := a.clusterStateRegistry.GetStatus(currentTime)
utils.WriteStatusConfigMap(autoscalingContext.ClientSet, autoscalingContext.ConfigNamespace,
*status, a.AutoscalingContext.LogRecorder, a.AutoscalingContext.StatusConfigMapName, currentTime)
}
// This deferred processor execution allows the processors to handle a situation when a scale-(up|down)
// wasn't even attempted because e.g. the iteration exited earlier.
if !scaleUpStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
a.processors.ScaleUpStatusProcessor.Process(a.AutoscalingContext, scaleUpStatus)
}
if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
// Gather status before scaledown status processor invocation
nodeDeletionResults, nodeDeletionResultsAsOf := a.scaleDownActuator.DeletionResults()
scaleDownStatus.NodeDeleteResults = nodeDeletionResults
scaleDownStatus.NodeDeleteResultsAsOf = nodeDeletionResultsAsOf
a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf)
scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(a.AutoscalingContext, scaleDownStatus)
}
if a.processors != nil && a.processors.AutoscalingStatusProcessor != nil {
err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("AutoscalingStatusProcessor error: %v.", err)
}
}
}()
// Check if there are any nodes that failed to register in Kubernetes
// master.
unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
if len(unregisteredNodes) > 0 {
klog.V(1).Infof("%d unregistered nodes present", len(unregisteredNodes))
removedAny, err := a.removeOldUnregisteredNodes(unregisteredNodes,
a.clusterStateRegistry, currentTime, autoscalingContext.LogRecorder)
// There was a problem with removing unregistered nodes. Retry in the next loop.
if err != nil {
klog.Warningf("Failed to remove unregistered nodes: %v", err)
}
if removedAny {
klog.V(0).Infof("Some unregistered nodes were removed")
}
}
if !a.clusterStateRegistry.IsClusterHealthy() {
klog.Warning("Cluster is not ready for autoscaling")
a.scaleDownPlanner.CleanUpUnneededNodes()
autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "ClusterUnhealthy", "Cluster is unhealthy")
return nil
}
a.deleteCreatedNodesWithErrors()
// Check if there has been a constant difference between the number of nodes in k8s and
// the number of nodes on the cloud provider side.
// TODO: andrewskim - add protection for ready AWS nodes.
fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("Failed to fix node group sizes: %v", err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
}
if fixedSomething {
klog.V(0).Infof("Some node group target size was fixed, skipping the iteration")
return nil
}
metrics.UpdateLastTime(metrics.Autoscaling, time.Now())
// SchedulerUnprocessed might be zero here if it was disabled
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed))
if isSchedulerProcessingIgnored {
// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
unschedulablePods = append(unschedulablePods, schedulerUnprocessed...)
}
// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
// For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on
// them and not trigger another scale-up.
// The fake nodes are intentionally not added to the all nodes list, so that they are not considered as candidates for scale-down (which
// doesn't make sense as they're not real).
err = a.addUpcomingNodesToClusterSnapshot(upcomingCounts, nodeInfosForGroups)
if err != nil {
klog.Errorf("Failed adding upcoming nodes to cluster snapshot: %v", err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
// Some upcoming nodes can already be registered in the cluster, but not yet ready - we still inject replacements for them above. The actual registered nodes
// have to be filtered out of the all nodes list so that scale-down can't consider them as candidates. Otherwise, with aggressive scale-down settings, we
// could be removing the nodes before they have a chance to first become ready (the duration of which should be unrelated to the scale-down settings).
var allRegisteredUpcoming []string
for _, ngRegisteredUpcoming := range registeredUpcoming {
allRegisteredUpcoming = append(allRegisteredUpcoming, ngRegisteredUpcoming...)
}
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
// Remove the nodes from the snapshot as well so that the state is consistent.
for _, notStartedNodeName := range allRegisteredUpcoming {
err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName)
if err != nil {
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
// node is not in the snapshot - so we don't have to error out in that case.
if !errors.Is(err, clustersnapshot.ErrNodeNotFound) {
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
}
l, err := a.ClusterSnapshot.ListNodeInfos()
if err != nil {
klog.Errorf("Unable to fetch ClusterNode List for Debugging Snapshot, %v", err)
} else {
a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l)
}
unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
if err != nil {
klog.Warningf("Failed to process unschedulable pods: %v", err)
}
// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
preScaleUp := func() time.Time {
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
return scaleUpStart
}
postScaleUp := func(scaleUpStart time.Time) (bool, caerrors.AutoscalerError) {
metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)
if a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
a.processors.ScaleUpStatusProcessor.Process(autoscalingContext, scaleUpStatus)
scaleUpStatusProcessorAlreadyCalled = true
}
if typedErr != nil {
klog.Errorf("Failed to scale up: %v", typedErr)
return true, typedErr
}
if scaleUpStatus.Result == status.ScaleUpSuccessful {
a.lastScaleUpTime = currentTime
// No scale down in this iteration.
scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown
return true, nil
}
return false, nil
}
if len(unschedulablePodsToHelp) == 0 {
scaleUpStatus.Result = status.ScaleUpNotNeeded
klog.V(1).Info("No unschedulable pods")
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
klog.V(1).Info("Max total nodes in cluster reached")
} else if !isSchedulerProcessingIgnored && allPodsAreNew(unschedulablePodsToHelp, currentTime) {
// The assumption here is that these pods have been created very recently and probably there
// is more pods to come. In theory we could check the newest pod time but then if pod were created
// slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time.
// We also want to skip a real scale down (just like if the pods were handled).
a.processorCallbacks.DisableScaleDownForLoop()
scaleUpStatus.Result = status.ScaleUpInCooldown
klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else {
scaleUpStart := preScaleUp()
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, false)
if exit, err := postScaleUp(scaleUpStart); exit {
return err
}
}
if a.ScaleDownEnabled {
unneededStart := time.Now()
klog.V(4).Infof("Calculating unneeded nodes")
var scaleDownCandidates []*apiv1.Node
var podDestinations []*apiv1.Node
// podDestinations and scaleDownCandidates are initialized based on allNodes variable, which contains only
// registered nodes in cluster.
// It does not include any upcoming nodes which can be part of clusterSnapshot. As an alternative to using
// allNodes here, we could use nodes from clusterSnapshot and explicitly filter out upcoming nodes here but it
// is of little (if any) benefit.
if a.processors == nil || a.processors.ScaleDownNodeProcessor == nil {
scaleDownCandidates = allNodes
podDestinations = allNodes
} else {
var err caerrors.AutoscalerError
scaleDownCandidates, err = a.processors.ScaleDownNodeProcessor.GetScaleDownCandidates(
autoscalingContext, allNodes)
if err != nil {
klog.Error(err)
return err
}
podDestinations, err = a.processors.ScaleDownNodeProcessor.GetPodDestinationCandidates(autoscalingContext, allNodes)
if err != nil {
klog.Error(err)
return err
}
}
typedErr := a.scaleDownPlanner.UpdateClusterState(podDestinations, scaleDownCandidates, scaleDownActuationStatus, currentTime)
// Update clusterStateRegistry and metrics regardless of whether ScaleDown was successful or not.
unneededNodes := a.scaleDownPlanner.UnneededNodes()
a.processors.ScaleDownCandidatesNotifier.Update(unneededNodes, currentTime)
metrics.UpdateUnneededNodesCount(len(unneededNodes))
if typedErr != nil {
scaleDownStatus.Result = scaledownstatus.ScaleDownError
klog.Errorf("Failed to scale down: %v", typedErr)
return typedErr
}
metrics.UpdateDurationFromStart(metrics.FindUnneeded, unneededStart)
scaleDownInCooldown := a.isScaleDownInCooldown(currentTime, scaleDownCandidates)
klog.V(4).Infof("Scale down status: lastScaleUpTime=%s lastScaleDownDeleteTime=%v "+
"lastScaleDownFailTime=%s scaleDownForbidden=%v scaleDownInCooldown=%v",
a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime,
a.processorCallbacks.disableScaleDownForLoop, scaleDownInCooldown)
metrics.UpdateScaleDownInCooldown(scaleDownInCooldown)
// We want to delete unneeded Node Groups only if here is no current delete
// in progress.
_, drained := scaleDownActuationStatus.DeletionsInProgress()
var removedNodeGroups []cloudprovider.NodeGroup
if len(drained) == 0 {
var err error
removedNodeGroups, err = a.processors.NodeGroupManager.RemoveUnneededNodeGroups(autoscalingContext)
if err != nil {
klog.Errorf("Error while removing unneeded node groups: %v", err)
}
scaleDownStatus.RemovedNodeGroups = removedNodeGroups
}
if scaleDownInCooldown {
scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown
} else {
klog.V(4).Infof("Starting scale down")
scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
empty, needDrain := a.scaleDownPlanner.NodesToDelete(currentTime)
scaleDownResult, scaledDownNodes, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain)
scaleDownStatus.Result = scaleDownResult
scaleDownStatus.ScaledDownNodes = scaledDownNodes
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes()))
scaleDownStatus.RemovedNodeGroups = removedNodeGroups
if scaleDownStatus.Result == scaledownstatus.ScaleDownNodeDeleteStarted {
a.lastScaleDownDeleteTime = currentTime
a.clusterStateRegistry.Recalculate()
}
if scaleDownStatus.Result == scaledownstatus.ScaleDownNoNodeDeleted &&
a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 {
taintableNodes := a.scaleDownPlanner.UnneededNodes()
// Make sure we are only cleaning taints from selected node groups.
selectedNodes := filterNodesFromSelectedGroups(a.CloudProvider, allNodes...)
// This is a sanity check to make sure `taintableNodes` only includes
// nodes from selected nodes.
taintableNodes = intersectNodes(selectedNodes, taintableNodes)
untaintableNodes := subtractNodes(selectedNodes, taintableNodes)
actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes)
}
if typedErr != nil {
klog.Errorf("Failed to scale down: %v", typedErr)
a.lastScaleDownFailTime = currentTime
return typedErr
}
}
}
if a.EnforceNodeGroupMinSize {
scaleUpStart := preScaleUp()
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(readyNodes, nodeInfosForGroups)
if exit, err := postScaleUp(scaleUpStart); exit {
return err
}
}
return nil
}