in cluster-autoscaler/core/scale_up.go [324:627]
func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, ignoredTaints taints.TaintKeySet) (*status.ScaleUpStatus, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
klog.V(1).Info("No unschedulable pods")
return &status.ScaleUpStatus{Result: status.ScaleUpNotNeeded}, nil
}
now := time.Now()
loggingQuota := klogx.PodsLoggingQuota()
for _, pod := range unschedulablePods {
klogx.V(1).UpTo(loggingQuota).Infof("Pod %s/%s is unschedulable", pod.Namespace, pod.Name)
}
klogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
nodesFromNotAutoscaledGroups, err := utils.FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, err.AddPrefix("failed to filter out nodes which are from not autoscaled groups: "))
}
nodeGroups := context.CloudProvider.NodeGroups()
gpuLabel := context.CloudProvider.GPULabel()
availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes()
resourceLimiter, errCP := context.CloudProvider.GetResourceLimiter()
if errCP != nil {
return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(
errors.CloudProviderError,
errCP))
}
scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter)
if errLimits != nil {
return scaleUpError(&status.ScaleUpStatus{}, errLimits.AddPrefix("Could not compute total resources: "))
}
upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
nodeTemplate, found := nodeInfos[nodeGroup]
if !found {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(
errors.InternalError,
"failed to find template node for node group %s",
nodeGroup))
}
for i := 0; i < numberOfNodes; i++ {
upcomingNodes = append(upcomingNodes, nodeTemplate)
}
}
klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))
expansionOptions := make(map[string]expander.Option, 0)
if processors != nil && processors.NodeGroupListProcessor != nil {
var errProc error
nodeGroups, nodeInfos, errProc = processors.NodeGroupListProcessor.Process(context, nodeGroups, nodeInfos, unschedulablePods)
if errProc != nil {
return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, errProc))
}
}
podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)
skippedNodeGroups := map[string]status.Reasons{}
for _, nodeGroup := range nodeGroups {
// Autoprovisioned node groups without nodes are created later so skip check for them.
if nodeGroup.Exist() && !clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) {
// Hack that depends on internals of IsNodeGroupSafeToScaleUp.
if !clusterStateRegistry.IsNodeGroupHealthy(nodeGroup.Id()) {
klog.Warningf("Node group %s is not ready for scaleup - unhealthy", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
} else {
klog.Warningf("Node group %s is not ready for scaleup - backoff", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = backoffReason
}
continue
}
currentTargetSize, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Failed to get node group size: %v", err)
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
continue
}
if currentTargetSize >= nodeGroup.MaxSize() {
klog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = maxLimitReachedReason
continue
}
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
klog.Errorf("No node info for: %s", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
continue
}
scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter)
if err != nil {
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
continue
}
checkResult := scaleUpResourcesLeft.checkScaleUpDeltaWithinLimits(scaleUpResourcesDelta)
if checkResult.exceeded {
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.exceededResources)
skippedNodeGroups[nodeGroup.Id()] = maxResourceLimitReached(checkResult.exceededResources)
continue
}
option, err := computeExpansionOption(context, podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
}
if len(option.Pods) > 0 {
if option.NodeCount > 0 {
expansionOptions[nodeGroup.Id()] = option
} else {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
}
} else {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
}
}
if len(expansionOptions) == 0 {
klog.V(1).Info("No expansion options")
return &status.ScaleUpStatus{
Result: status.ScaleUpNoOptionsAvailable,
PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
ConsideredNodeGroups: nodeGroups,
}, nil
}
// Pick some expansion option.
options := make([]expander.Option, 0, len(expansionOptions))
for _, o := range expansionOptions {
options = append(options, o)
}
bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
if bestOption != nil && bestOption.NodeCount > 0 {
klog.V(1).Infof("Best option to resize: %s", bestOption.NodeGroup.Id())
if len(bestOption.Debug) > 0 {
klog.V(1).Info(bestOption.Debug)
}
klog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id())
newNodes := bestOption.NodeCount
if context.MaxNodesTotal > 0 && len(nodes)+newNodes+len(upcomingNodes) > context.MaxNodesTotal {
klog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)
newNodes = context.MaxNodesTotal - len(nodes) - len(upcomingNodes)
context.LogRecorder.Eventf(apiv1.EventTypeWarning, "MaxNodesTotalReached", "Max total nodes in cluster reached: %v", context.MaxNodesTotal)
if newNodes < 1 {
return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods},
errors.NewAutoscalerError(errors.TransientError, "max node total count already reached"))
}
}
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() {
oldId := bestOption.NodeGroup.Id()
createNodeGroupResult, err := processors.NodeGroupManager.CreateNodeGroup(context, bestOption.NodeGroup)
if err != nil {
return scaleUpError(
&status.ScaleUpStatus{FailedCreationNodeGroups: []cloudprovider.NodeGroup{bestOption.NodeGroup}, PodsTriggeredScaleUp: bestOption.Pods},
err)
}
createNodeGroupResults = append(createNodeGroupResults, createNodeGroupResult)
bestOption.NodeGroup = createNodeGroupResult.MainCreatedNodeGroup
// If possible replace candidate node-info with node info based on crated node group. The latter
// one should be more in line with nodes which will be created by node group.
mainCreatedNodeInfo, err := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, context.PredicateChecker, ignoredTaints)
if err == nil {
nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo
} else {
klog.Warningf("Cannot build node info for newly created main node group %v; balancing similar node groups may not work; err=%v", createNodeGroupResult.MainCreatedNodeGroup.Id(), err)
// Use node info based on expansion candidate but upadte Id which likely changed when node group was created.
nodeInfos[bestOption.NodeGroup.Id()] = nodeInfos[oldId]
}
if oldId != createNodeGroupResult.MainCreatedNodeGroup.Id() {
delete(nodeInfos, oldId)
}
for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups {
nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonSets, context.PredicateChecker, ignoredTaints)
if err != nil {
klog.Warningf("Cannot build node info for newly created extra node group %v; balancing similar node groups will not work; err=%v", nodeGroup.Id(), err)
continue
}
nodeInfos[nodeGroup.Id()] = nodeInfo
option, err2 := computeExpansionOption(context, podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
if err2 != nil {
return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, errors.ToAutoscalerError(errors.InternalError, err))
}
if len(option.Pods) > 0 && option.NodeCount > 0 {
expansionOptions[nodeGroup.Id()] = option
}
}
// Update ClusterStateRegistry so similar nodegroups rebalancing works.
// TODO(lukaszos) when pursuing scalability update this call with one which takes list of changed node groups so we do not
// do extra API calls. (the call at the bottom of ScaleUp() could be also changed then)
clusterStateRegistry.Recalculate()
}
nodeInfo, found := nodeInfos[bestOption.NodeGroup.Id()]
if !found {
// This should never happen, as we already should have retrieved
// nodeInfo for any considered nodegroup.
klog.Errorf("No node info for: %s", bestOption.NodeGroup.Id())
return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
errors.NewAutoscalerError(
errors.CloudProviderError,
"No node info for best expansion option!"))
}
// apply upper limits for CPU and memory
newNodes, err = applyScaleUpResourcesLimits(context, processors, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)
if err != nil {
return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
err)
}
targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup}
if context.BalanceSimilarNodeGroups {
similarNodeGroups, typedErr := processors.NodeGroupSetProcessor.FindSimilarNodeGroups(context, bestOption.NodeGroup, nodeInfos)
if typedErr != nil {
return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
typedErr.AddPrefix("Failed to find matching node groups: "))
}
similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, expansionOptions)
for _, ng := range similarNodeGroups {
if clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) {
targetNodeGroups = append(targetNodeGroups, ng)
} else {
// This should never happen, as we will filter out the node group earlier on
// because of missing entry in podsPassingPredicates, but double checking doesn't
// really cost us anything
klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id())
}
}
if len(targetNodeGroups) > 1 {
var buffer bytes.Buffer
for i, ng := range targetNodeGroups {
if i > 0 {
buffer.WriteString(", ")
}
buffer.WriteString(ng.Id())
}
klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), buffer.String())
}
}
scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(
context, targetNodeGroups, newNodes)
if typedErr != nil {
return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
typedErr)
}
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
for _, info := range scaleUpInfos {
typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil), now)
if typedErr != nil {
return scaleUpError(
&status.ScaleUpStatus{
CreateNodeGroupResults: createNodeGroupResults,
FailedResizeNodeGroups: []cloudprovider.NodeGroup{info.Group},
PodsTriggeredScaleUp: bestOption.Pods,
},
typedErr,
)
}
}
clusterStateRegistry.Recalculate()
return &status.ScaleUpStatus{
Result: status.ScaleUpSuccessful,
ScaleUpInfos: scaleUpInfos,
PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
ConsideredNodeGroups: nodeGroups,
CreateNodeGroupResults: createNodeGroupResults,
PodsTriggeredScaleUp: bestOption.Pods,
PodsAwaitEvaluation: getPodsAwaitingEvaluation(podEquivalenceGroups, bestOption.NodeGroup.Id()),
}, nil
}
return &status.ScaleUpStatus{
Result: status.ScaleUpNoOptionsAvailable,
PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
ConsideredNodeGroups: nodeGroups,
}, nil
}