cluster-autoscaler/core/scaledown/actuation/actuator.go (295 lines of code) (raw):

/* Copyright 2022 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package actuation import ( "strings" "time" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/expiring" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" "k8s.io/klog/v2" ) const ( pastLatencyExpireDuration = time.Hour ) // Actuator is responsible for draining and deleting nodes. type Actuator struct { ctx *context.AutoscalingContext nodeDeletionTracker *deletiontracker.NodeDeletionTracker nodeDeletionScheduler *GroupDeletionScheduler deleteOptions options.NodeDeleteOptions drainabilityRules rules.Rules // TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor // This is a larger change to the code structure which impacts some existing actuator unit tests // as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor budgetProcessor *budgets.ScaleDownBudgetProcessor configGetter actuatorNodeGroupConfigGetter nodeDeleteDelayAfterTaint time.Duration pastLatencies *expiring.List } // actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used // from NodeGroupConfigProcessor interface type actuatorNodeGroupConfigGetter interface { // GetIgnoreDaemonSetsUtilization returns IgnoreDaemonSetsUtilization value that should be used for a given NodeGroup. GetIgnoreDaemonSetsUtilization(nodeGroup cloudprovider.NodeGroup) (bool, error) } // NewActuator returns a new instance of Actuator. func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator { ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval) legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec) var evictor Evictor if len(ctx.DrainPriorityConfig) > 0 { evictor = NewEvictor(ndt, ctx.DrainPriorityConfig, true) } else { evictor = NewEvictor(ndt, legacyFlagDrainConfig, false) } return &Actuator{ ctx: ctx, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), deleteOptions: deleteOptions, drainabilityRules: drainabilityRules, configGetter: configGetter, nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint, pastLatencies: expiring.NewList(), } } // CheckStatus should returns an immutable snapshot of ongoing deletions. func (a *Actuator) CheckStatus() scaledown.ActuationStatus { return a.nodeDeletionTracker.Snapshot() } // ClearResultsNotNewerThan removes information about deletions finished before or exactly at the provided timestamp. func (a *Actuator) ClearResultsNotNewerThan(t time.Time) { a.nodeDeletionTracker.ClearResultsNotNewerThan(t) } // DeletionResults returns deletion results since the last ClearResultsNotNewerThan call // in a map form, along with the timestamp of last result. func (a *Actuator) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { return a.nodeDeletionTracker.DeletionResults() } // StartDeletion triggers a new deletion process. func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) { a.nodeDeletionScheduler.ResetAndReportMetrics() deletionStartTime := time.Now() defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }() scaledDownNodes := make([]*status.ScaleDownNode, 0) emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(a.nodeDeletionTracker, empty, drain) if len(emptyToDelete) == 0 && len(drainToDelete) == 0 { return status.ScaleDownNoNodeDeleted, nil, nil } if len(emptyToDelete) > 0 { // Taint all empty nodes synchronously nodeDeleteDelayAfterTaint, err := a.taintNodesSync(emptyToDelete) if err != nil { return status.ScaleDownError, scaledDownNodes, err } emptyScaledDown := a.deleteAsyncEmpty(emptyToDelete, nodeDeleteDelayAfterTaint) scaledDownNodes = append(scaledDownNodes, emptyScaledDown...) } if len(drainToDelete) > 0 { // Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node // could get recreated on another. nodeDeleteDelayAfterTaint, err := a.taintNodesSync(drainToDelete) if err != nil { return status.ScaleDownError, scaledDownNodes, err } // All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously. drainScaledDown := a.deleteAsyncDrain(drainToDelete, nodeDeleteDelayAfterTaint) scaledDownNodes = append(scaledDownNodes, drainScaledDown...) } return status.ScaleDownNodeDeleteStarted, scaledDownNodes, nil } // deleteAsyncEmpty immediately starts deletions asynchronously. // scaledDownNodes return value contains all nodes for which deletion successfully started. func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView, nodeDeleteDelayAfterTaint time.Duration) (reportedSDNodes []*status.ScaleDownNode) { for _, bucket := range NodeGroupViews { for _, node := range bucket.Nodes { klog.V(0).Infof("Scale-down: removing empty node %q", node.Name) a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %q", node.Name) if sdNode, err := a.scaleDownNodeToReport(node, false); err == nil { reportedSDNodes = append(reportedSDNodes, sdNode) } else { klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err) } a.nodeDeletionTracker.StartDeletion(bucket.Group.Id(), node.Name) } } for _, bucket := range NodeGroupViews { go a.deleteNodesAsync(bucket.Nodes, bucket.Group, false, bucket.BatchSize, nodeDeleteDelayAfterTaint) } return reportedSDNodes } // taintNodesSync synchronously taints all provided nodes with NoSchedule. If tainting fails for any of the nodes, already // applied taints are cleaned up. func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) (time.Duration, errors.AutoscalerError) { var taintedNodes []*apiv1.Node var updateLatencyTracker *UpdateLatencyTracker nodeDeleteDelayAfterTaint := a.nodeDeleteDelayAfterTaint if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { updateLatencyTracker = NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister()) go updateLatencyTracker.Start() } for _, bucket := range NodeGroupViews { for _, node := range bucket.Nodes { if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { updateLatencyTracker.StartTimeChan <- nodeTaintStartTime{node.Name, time.Now()} } err := a.taintNode(node) if err != nil { a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err) // Clean up already applied taints in case of issues. for _, taintedNode := range taintedNodes { _, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate) } if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { close(updateLatencyTracker.AwaitOrStopChan) } return nodeDeleteDelayAfterTaint, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node) } taintedNodes = append(taintedNodes, node) } } if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { updateLatencyTracker.AwaitOrStopChan <- true latency, ok := <-updateLatencyTracker.ResultChan if ok { a.pastLatencies.RegisterElement(latency) a.pastLatencies.DropNotNewerThan(time.Now().Add(-1 * pastLatencyExpireDuration)) // CA is expected to wait 3 times the round-trip time between CA and the api-server. // At this point, we have already tainted all the nodes. // Therefore, the nodeDeleteDelayAfterTaint is set 2 times the maximum latency observed during the last hour. nodeDeleteDelayAfterTaint = 2 * maxLatency(a.pastLatencies.ToSlice()) } } return nodeDeleteDelayAfterTaint, nil } // deleteAsyncDrain asynchronously starts deletions with drain for all provided nodes. scaledDownNodes return value contains all nodes for which // deletion successfully started. func (a *Actuator) deleteAsyncDrain(NodeGroupViews []*budgets.NodeGroupView, nodeDeleteDelayAfterTaint time.Duration) (reportedSDNodes []*status.ScaleDownNode) { for _, bucket := range NodeGroupViews { for _, drainNode := range bucket.Nodes { if sdNode, err := a.scaleDownNodeToReport(drainNode, true); err == nil { klog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", drainNode.Name, sdNode.UtilInfo, joinPodNames(sdNode.EvictedPods)) a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", drainNode.Name, sdNode.UtilInfo, joinPodNames(sdNode.EvictedPods)) reportedSDNodes = append(reportedSDNodes, sdNode) } else { klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err) } a.nodeDeletionTracker.StartDeletionWithDrain(bucket.Group.Id(), drainNode.Name) } } for _, bucket := range NodeGroupViews { go a.deleteNodesAsync(bucket.Nodes, bucket.Group, true, bucket.BatchSize, nodeDeleteDelayAfterTaint) } return reportedSDNodes } func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, batchSize int, nodeDeleteDelayAfterTaint time.Duration) { var remainingPdbTracker pdb.RemainingPdbTracker var registry kube_util.ListerRegistry if len(nodes) == 0 { return } if nodeDeleteDelayAfterTaint > time.Duration(0) { klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", nodeDeleteDelayAfterTaint) time.Sleep(nodeDeleteDelayAfterTaint) } clusterSnapshot, err := a.createSnapshot(nodes) if err != nil { klog.Errorf("Scale-down: couldn't create delete snapshot, err: %v", err) nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "createSnapshot returned error %v", err)} for _, node := range nodes { a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "failed to create delete snapshot", nodeDeleteResult) } return } if drain { pdbs, err := a.ctx.PodDisruptionBudgetLister().List() if err != nil { klog.Errorf("Scale-down: couldn't fetch pod disruption budgets, err: %v", err) nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "podDisruptionBudgetLister.List returned error %v", err)} for _, node := range nodes { a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "failed to fetch pod disruption budgets", nodeDeleteResult) } return } remainingPdbTracker = pdb.NewBasicRemainingPdbTracker() remainingPdbTracker.SetPdbs(pdbs) registry = a.ctx.ListerRegistry } if batchSize == 0 { batchSize = len(nodes) } for _, node := range nodes { nodeInfo, err := clusterSnapshot.GetNodeInfo(node.Name) if err != nil { klog.Errorf("Scale-down: can't retrieve node %q from snapshot, err: %v", node.Name, err) nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)} a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "failed to get node info", nodeDeleteResult) continue } podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, a.drainabilityRules, registry, remainingPdbTracker, time.Now()) if err != nil { klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err) nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)} a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "failed to get pods to move on node", nodeDeleteResult) continue } if !drain && len(podsToRemove) != 0 { klog.Errorf("Scale-down: couldn't delete empty node %q, new pods got scheduled", node.Name) nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "failed to delete empty node %q, new pods scheduled", node.Name)} a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "node is not empty", nodeDeleteResult) continue } go a.nodeDeletionScheduler.ScheduleDeletion(nodeInfo, nodeGroup, batchSize, drain) } } func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.ScaleDownNode, error) { nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node) if err != nil { return nil, err } nodeInfo, err := a.ctx.ClusterSnapshot.GetNodeInfo(node.Name) if err != nil { return nil, err } ignoreDaemonSetsUtilization, err := a.configGetter.GetIgnoreDaemonSetsUtilization(nodeGroup) if err != nil { return nil, err } gpuConfig := a.ctx.CloudProvider.GetNodeGpuConfig(node) utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, gpuConfig, time.Now()) if err != nil { return nil, err } var evictedPods []*apiv1.Pod if drain { _, nonDsPodsToEvict := podsToEvict(nodeInfo, a.ctx.DaemonSetEvictionForOccupiedNodes) evictedPods = nonDsPodsToEvict } return &status.ScaleDownNode{ Node: node, NodeGroup: nodeGroup, EvictedPods: evictedPods, UtilInfo: utilInfo, }, nil } // taintNode taints the node with NoSchedule to prevent new pods scheduling on it. func (a *Actuator) taintNode(node *apiv1.Node) error { if err := taints.MarkToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate); err != nil { a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err) return errors.ToAutoscalerError(errors.ApiCallError, err) } a.ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "marked the node as toBeDeleted/unschedulable") return nil } func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) { snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.ctx.FrameworkHandle) pods, err := a.ctx.AllPodLister().List() if err != nil { return nil, err } scheduledPods := kube_util.ScheduledPods(pods) nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff) err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods) if err != nil { return nil, err } return snapshot, nil } func joinPodNames(pods []*apiv1.Pod) string { var names []string for _, pod := range pods { names = append(names, pod.Name) } return strings.Join(names, ",") }