func()

in cluster-autoscaler/core/scale_down.go [789:1019]


func (sd *ScaleDown) TryToScaleDown(
	currentTime time.Time,
	pdbs []*policyv1.PodDisruptionBudget,
) (*status.ScaleDownStatus, errors.AutoscalerError) {

	scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeletionTracker.GetAndClearNodeDeleteResults()}
	nodeDeletionDuration := time.Duration(0)
	findNodesToRemoveDuration := time.Duration(0)
	defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration)

	allNodeInfos, errSnapshot := sd.context.ClusterSnapshot.NodeInfos().List()
	if errSnapshot != nil {
		// This should never happen, List() returns err only because scheduler interface requires it.
		return scaleDownStatus, errors.ToAutoscalerError(errors.InternalError, errSnapshot)
	}

	nodesWithoutMaster := filterOutMasters(allNodeInfos)
	nodesWithoutMasterNames := make([]string, 0, len(nodesWithoutMaster))
	for _, node := range nodesWithoutMaster {
		nodesWithoutMasterNames = append(nodesWithoutMasterNames, node.Name)
	}

	candidateNames := make([]string, 0)
	readinessMap := make(map[string]bool)
	candidateNodeGroups := make(map[string]cloudprovider.NodeGroup)
	gpuLabel := sd.context.CloudProvider.GPULabel()
	availableGPUTypes := sd.context.CloudProvider.GetAvailableGPUTypes()

	resourceLimiter, errCP := sd.context.CloudProvider.GetResourceLimiter()
	if errCP != nil {
		scaleDownStatus.Result = status.ScaleDownError
		return scaleDownStatus, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
	}

	scaleDownResourcesLeft := sd.computeScaleDownResourcesLeftLimits(nodesWithoutMaster, resourceLimiter, sd.context.CloudProvider, currentTime)

	nodeGroupSize := utils.GetNodeGroupSizeMap(sd.context.CloudProvider)
	resourcesWithLimits := resourceLimiter.GetResources()
	for nodeName, unneededSince := range sd.unneededNodes {
		klog.V(2).Infof("%s was unneeded for %s", nodeName, currentTime.Sub(unneededSince).String())

		nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(nodeName)
		if err != nil {
			klog.Errorf("Can't retrieve unneeded node %s from snapshot, err: %v", nodeName, err)
			continue
		}

		node := nodeInfo.Node()

		// Check if node is marked with no scale down annotation.
		if hasNoScaleDownAnnotation(node) {
			klog.V(4).Infof("Skipping %s - scale down disabled annotation found", node.Name)
			sd.addUnremovableNodeReason(node, simulator.ScaleDownDisabledAnnotation)
			continue
		}

		ready, _, _ := kube_util.GetReadinessState(node)
		readinessMap[node.Name] = ready

		nodeGroup, err := sd.context.CloudProvider.NodeGroupForNode(node)
		if err != nil {
			klog.Errorf("Error while checking node group for %s: %v", node.Name, err)
			sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
			continue
		}
		if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
			klog.V(4).Infof("Skipping %s - no node group config", node.Name)
			sd.addUnremovableNodeReason(node, simulator.NotAutoscaled)
			continue
		}

		if ready {
			// Check how long a ready node was underutilized.
			unneededTime, err := sd.processors.NodeGroupConfigProcessor.GetScaleDownUnneededTime(sd.context, nodeGroup)
			if err != nil {
				klog.Errorf("Error trying to get ScaleDownUnneededTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
				continue
			}
			if !unneededSince.Add(unneededTime).Before(currentTime) {
				sd.addUnremovableNodeReason(node, simulator.NotUnneededLongEnough)
				continue
			}
		} else {
			// Unready nodes may be deleted after a different time than underutilized nodes.
			unreadyTime, err := sd.processors.NodeGroupConfigProcessor.GetScaleDownUnreadyTime(sd.context, nodeGroup)
			if err != nil {
				klog.Errorf("Error trying to get ScaleDownUnnreadyTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
				continue
			}
			if !unneededSince.Add(unreadyTime).Before(currentTime) {
				sd.addUnremovableNodeReason(node, simulator.NotUnreadyLongEnough)
				continue
			}
		}

		size, found := nodeGroupSize[nodeGroup.Id()]
		if !found {
			klog.Errorf("Error while checking node group size %s: group size not found in cache", nodeGroup.Id())
			sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
			continue
		}

		deletionsInProgress := sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id())
		if size-deletionsInProgress <= nodeGroup.MinSize() {
			klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
			sd.addUnremovableNodeReason(node, simulator.NodeGroupMinSizeReached)
			continue
		}

		scaleDownResourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
		if err != nil {
			klog.Errorf("Error getting node resources: %v", err)
			sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
			continue
		}

		checkResult := scaleDownResourcesLeft.checkScaleDownDeltaWithinLimits(scaleDownResourcesDelta)
		if checkResult.exceeded {
			klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.exceededResources)
			sd.addUnremovableNodeReason(node, simulator.MinimalResourceLimitExceeded)
			continue
		}

		candidateNames = append(candidateNames, node.Name)
		candidateNodeGroups[node.Name] = nodeGroup
	}

	if len(candidateNames) == 0 {
		klog.V(1).Infof("No candidates for scale down")
		scaleDownStatus.Result = status.ScaleDownNoUnneeded
		return scaleDownStatus, nil
	}

	// Trying to delete empty nodes in bulk. If there are no empty nodes then CA will
	// try to delete not-so-empty nodes, possibly killing some pods and allowing them
	// to recreate on other nodes.
	emptyNodesToRemove := sd.getEmptyNodesToRemove(candidateNames, scaleDownResourcesLeft, currentTime)
	emptyNodesToRemove = sd.processors.ScaleDownSetProcessor.GetNodesToRemove(sd.context, emptyNodesToRemove, sd.context.MaxEmptyBulkDelete)
	if len(emptyNodesToRemove) > 0 {
		nodeDeletionStart := time.Now()
		deletedNodes, err := sd.scheduleDeleteEmptyNodes(emptyNodesToRemove, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups)
		nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)

		// TODO: Give the processor some information about the nodes that failed to be deleted.
		scaleDownStatus.ScaledDownNodes = sd.mapNodesToStatusScaleDownNodes(deletedNodes, candidateNodeGroups, make(map[string][]*apiv1.Pod))
		if len(deletedNodes) > 0 {
			scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
		} else {
			scaleDownStatus.Result = status.ScaleDownError
		}
		if err != nil {
			return scaleDownStatus, err.AddPrefix("failed to delete at least one empty node: ")
		}
		return scaleDownStatus, nil
	}

	findNodesToRemoveStart := time.Now()

	// We look for only 1 node so new hints may be incomplete.
	nodesToRemove, unremovable, _, err := simulator.FindNodesToRemove(
		candidateNames,
		nodesWithoutMasterNames,
		sd.context.ListerRegistry,
		sd.context.ClusterSnapshot,
		sd.context.PredicateChecker,
		sd.podLocationHints,
		sd.usageTracker,
		time.Now(),
		pdbs)
	findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)

	for _, unremovableNode := range unremovable {
		sd.addUnremovableNode(unremovableNode)
	}

	if err != nil {
		scaleDownStatus.Result = status.ScaleDownError
		return scaleDownStatus, err.AddPrefix("Find node to remove failed: ")
	}
	nodesToRemove = sd.processors.ScaleDownSetProcessor.GetNodesToRemove(sd.context, nodesToRemove, 1)
	if len(nodesToRemove) == 0 {
		klog.V(1).Infof("No node to remove")
		scaleDownStatus.Result = status.ScaleDownNoNodeDeleted
		return scaleDownStatus, nil
	}
	toRemove := nodesToRemove[0]
	utilization := sd.nodeUtilizationMap[toRemove.Node.Name]
	podNames := make([]string, 0, len(toRemove.PodsToReschedule))
	for _, pod := range toRemove.PodsToReschedule {
		podNames = append(podNames, pod.Namespace+"/"+pod.Name)
	}
	klog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", toRemove.Node.Name, utilization,
		strings.Join(podNames, ","))
	sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: removing node %s, utilization: %v, pods to reschedule: %s",
		toRemove.Node.Name, utilization, strings.Join(podNames, ","))

	// Nothing super-bad should happen if the node is removed from tracker prematurely.
	simulator.RemoveNodeFromTracker(sd.usageTracker, toRemove.Node.Name, sd.unneededNodes)
	nodeDeletionStart := time.Now()

	// Starting deletion.
	nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
	sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(true)

	go func() {
		// Finishing the delete process once this goroutine is over.
		var result status.NodeDeleteResult
		defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(toRemove.Node.Name, result) }()
		defer sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(false)
		nodeGroup, found := candidateNodeGroups[toRemove.Node.Name]
		if !found {
			result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: errors.NewAutoscalerError(
				errors.InternalError, "failed to find node group for %s", toRemove.Node.Name)}
			return
		}
		result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, toRemove.DaemonSetPods, nodeGroup)
		if result.ResultType != status.NodeDeleteOk {
			klog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, result.Err)
			return
		}
		if readinessMap[toRemove.Node.Name] {
			metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, toRemove.Node, nodeGroup), metrics.Underutilized)
		} else {
			metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, toRemove.Node, nodeGroup), metrics.Unready)
		}
	}()

	scaleDownStatus.ScaledDownNodes = sd.mapNodesToStatusScaleDownNodes([]*apiv1.Node{toRemove.Node}, candidateNodeGroups, map[string][]*apiv1.Pod{toRemove.Node.Name: toRemove.PodsToReschedule})
	scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
	return scaleDownStatus, nil
}