vertical-pod-autoscaler/pkg/updater/restriction/pods_restriction_factory.go (299 lines of code) (raw):

/* Copyright 2025 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package restriction import ( "fmt" "time" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" appsinformer "k8s.io/client-go/informers/apps/v1" coreinformer "k8s.io/client-go/informers/core/v1" kube_client "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/utils/clock" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" ) const ( resyncPeriod time.Duration = 1 * time.Minute ) // ControllerKind is the type of controller that can manage a pod. type controllerKind string const ( replicationController controllerKind = "ReplicationController" statefulSet controllerKind = "StatefulSet" replicaSet controllerKind = "ReplicaSet" daemonSet controllerKind = "DaemonSet" job controllerKind = "Job" ) type podReplicaCreator struct { Namespace string Name string Kind controllerKind } // PodsRestrictionFactory is a factory for creating PodsEvictionRestriction and PodsInPlaceRestriction. type PodsRestrictionFactory interface { GetCreatorMaps(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) (map[podReplicaCreator]singleGroupStats, map[string]podReplicaCreator, error) NewPodsEvictionRestriction(creatorToSingleGroupStatsMap map[podReplicaCreator]singleGroupStats, podToReplicaCreatorMap map[string]podReplicaCreator) PodsEvictionRestriction NewPodsInPlaceRestriction(creatorToSingleGroupStatsMap map[podReplicaCreator]singleGroupStats, podToReplicaCreatorMap map[string]podReplicaCreator) PodsInPlaceRestriction } // PodsRestrictionFactoryImpl is the implementation of the PodsRestrictionFactory interface. type PodsRestrictionFactoryImpl struct { client kube_client.Interface rcInformer cache.SharedIndexInformer // informer for Replication Controllers ssInformer cache.SharedIndexInformer // informer for Stateful Sets rsInformer cache.SharedIndexInformer // informer for Replica Sets dsInformer cache.SharedIndexInformer // informer for Daemon Sets minReplicas int evictionToleranceFraction float64 clock clock.Clock lastInPlaceAttemptTimeMap map[string]time.Time patchCalculators []patch.Calculator } // NewPodsRestrictionFactory creates a new PodsRestrictionFactory. func NewPodsRestrictionFactory(client kube_client.Interface, minReplicas int, evictionToleranceFraction float64, patchCalculators []patch.Calculator) (PodsRestrictionFactory, error) { rcInformer, err := setupInformer(client, replicationController) if err != nil { return nil, fmt.Errorf("Failed to create rcInformer: %v", err) } ssInformer, err := setupInformer(client, statefulSet) if err != nil { return nil, fmt.Errorf("Failed to create ssInformer: %v", err) } rsInformer, err := setupInformer(client, replicaSet) if err != nil { return nil, fmt.Errorf("Failed to create rsInformer: %v", err) } dsInformer, err := setupInformer(client, daemonSet) if err != nil { return nil, fmt.Errorf("Failed to create dsInformer: %v", err) } return &PodsRestrictionFactoryImpl{ client: client, rcInformer: rcInformer, // informer for Replication Controllers ssInformer: ssInformer, // informer for Stateful Sets rsInformer: rsInformer, // informer for Replica Sets dsInformer: dsInformer, // informer for Daemon Sets minReplicas: minReplicas, evictionToleranceFraction: evictionToleranceFraction, clock: &clock.RealClock{}, lastInPlaceAttemptTimeMap: make(map[string]time.Time), patchCalculators: patchCalculators, }, nil } func (f *PodsRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator) (int, error) { switch creator.Kind { case replicationController: rcObj, exists, err := f.rcInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name) if err != nil { return 0, fmt.Errorf("replication controller %s/%s is not available, err: %v", creator.Namespace, creator.Name, err) } if !exists { return 0, fmt.Errorf("replication controller %s/%s does not exist", creator.Namespace, creator.Name) } rc, ok := rcObj.(*apiv1.ReplicationController) if !ok { return 0, fmt.Errorf("Failed to parse Replication Controller") } if rc.Spec.Replicas == nil || *rc.Spec.Replicas == 0 { return 0, fmt.Errorf("replication controller %s/%s has no replicas config", creator.Namespace, creator.Name) } return int(*rc.Spec.Replicas), nil case replicaSet: rsObj, exists, err := f.rsInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name) if err != nil { return 0, fmt.Errorf("replica set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err) } if !exists { return 0, fmt.Errorf("replica set %s/%s does not exist", creator.Namespace, creator.Name) } rs, ok := rsObj.(*appsv1.ReplicaSet) if !ok { return 0, fmt.Errorf("Failed to parse Replicaset") } if rs.Spec.Replicas == nil || *rs.Spec.Replicas == 0 { return 0, fmt.Errorf("replica set %s/%s has no replicas config", creator.Namespace, creator.Name) } return int(*rs.Spec.Replicas), nil case statefulSet: ssObj, exists, err := f.ssInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name) if err != nil { return 0, fmt.Errorf("stateful set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err) } if !exists { return 0, fmt.Errorf("stateful set %s/%s does not exist", creator.Namespace, creator.Name) } ss, ok := ssObj.(*appsv1.StatefulSet) if !ok { return 0, fmt.Errorf("Failed to parse StatefulSet") } if ss.Spec.Replicas == nil || *ss.Spec.Replicas == 0 { return 0, fmt.Errorf("stateful set %s/%s has no replicas config", creator.Namespace, creator.Name) } return int(*ss.Spec.Replicas), nil case daemonSet: dsObj, exists, err := f.dsInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name) if err != nil { return 0, fmt.Errorf("daemon set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err) } if !exists { return 0, fmt.Errorf("daemon set %s/%s does not exist", creator.Namespace, creator.Name) } ds, ok := dsObj.(*appsv1.DaemonSet) if !ok { return 0, fmt.Errorf("Failed to parse DaemonSet") } if ds.Status.NumberReady == 0 { return 0, fmt.Errorf("daemon set %s/%s has no number ready pods", creator.Namespace, creator.Name) } return int(ds.Status.NumberReady), nil } return 0, nil } // GetCreatorMaps is a helper function that returns a map of pod replica creators to their single group stats // and a map of pod ids to pod replica creator from a list of pods and it's corresponding VPA. func (f *PodsRestrictionFactoryImpl) GetCreatorMaps(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) (map[podReplicaCreator]singleGroupStats, map[string]podReplicaCreator, error) { livePods := make(map[podReplicaCreator][]*apiv1.Pod) for _, pod := range pods { creator, err := getPodReplicaCreator(pod) if err != nil { klog.ErrorS(err, "Failed to obtain replication info for pod", "pod", klog.KObj(pod)) continue } if creator == nil { klog.V(0).InfoS("Pod is not managed by any controller", "pod", klog.KObj(pod)) continue } livePods[*creator] = append(livePods[*creator], pod) } podToReplicaCreatorMap := make(map[string]podReplicaCreator) creatorToSingleGroupStatsMap := make(map[podReplicaCreator]singleGroupStats) // Use per-VPA minReplicas if present, fall back to the global setting. required := f.minReplicas if vpa.Spec.UpdatePolicy != nil && vpa.Spec.UpdatePolicy.MinReplicas != nil { required = int(*vpa.Spec.UpdatePolicy.MinReplicas) klog.V(3).InfoS("Overriding minReplicas from global to per-VPA value", "globalMinReplicas", f.minReplicas, "vpaMinReplicas", required, "vpa", klog.KObj(vpa)) } for creator, replicas := range livePods { actual := len(replicas) if actual < required { klog.V(2).InfoS("Too few replicas", "kind", creator.Kind, "object", klog.KRef(creator.Namespace, creator.Name), "livePods", actual, "requiredPods", required, "globalMinReplicas", f.minReplicas) continue } var configured int if creator.Kind == job { // Job has no replicas configuration, so we will use actual number of live pods as replicas count. configured = actual } else { var err error configured, err = f.getReplicaCount(creator) if err != nil { klog.ErrorS(err, "Failed to obtain replication info", "kind", creator.Kind, "object", klog.KRef(creator.Namespace, creator.Name)) continue } } singleGroup := singleGroupStats{} singleGroup.configured = configured singleGroup.evictionTolerance = int(float64(configured) * f.evictionToleranceFraction) // truncated for _, pod := range replicas { podToReplicaCreatorMap[getPodID(pod)] = creator if pod.Status.Phase == apiv1.PodPending { singleGroup.pending = singleGroup.pending + 1 } if isInPlaceUpdating(pod) { singleGroup.inPlaceUpdateOngoing = singleGroup.inPlaceUpdateOngoing + 1 } } singleGroup.running = len(replicas) - singleGroup.pending creatorToSingleGroupStatsMap[creator] = singleGroup } return creatorToSingleGroupStatsMap, podToReplicaCreatorMap, nil } // NewPodsEvictionRestriction creates a new PodsEvictionRestriction. func (f *PodsRestrictionFactoryImpl) NewPodsEvictionRestriction(creatorToSingleGroupStatsMap map[podReplicaCreator]singleGroupStats, podToReplicaCreatorMap map[string]podReplicaCreator) PodsEvictionRestriction { return &PodsEvictionRestrictionImpl{ client: f.client, podToReplicaCreatorMap: podToReplicaCreatorMap, creatorToSingleGroupStatsMap: creatorToSingleGroupStatsMap, clock: f.clock, lastInPlaceAttemptTimeMap: f.lastInPlaceAttemptTimeMap, } } // NewPodsInPlaceRestriction creates a new PodsInPlaceRestriction. func (f *PodsRestrictionFactoryImpl) NewPodsInPlaceRestriction(creatorToSingleGroupStatsMap map[podReplicaCreator]singleGroupStats, podToReplicaCreatorMap map[string]podReplicaCreator) PodsInPlaceRestriction { return &PodsInPlaceRestrictionImpl{ client: f.client, podToReplicaCreatorMap: podToReplicaCreatorMap, creatorToSingleGroupStatsMap: creatorToSingleGroupStatsMap, clock: f.clock, lastInPlaceAttemptTimeMap: f.lastInPlaceAttemptTimeMap, patchCalculators: f.patchCalculators, } } func getPodID(pod *apiv1.Pod) string { if pod == nil { return "" } return pod.Namespace + "/" + pod.Name } func getPodReplicaCreator(pod *apiv1.Pod) (*podReplicaCreator, error) { creator := managingControllerRef(pod) if creator == nil { return nil, nil } podReplicaCreator := &podReplicaCreator{ Namespace: pod.Namespace, Name: creator.Name, Kind: controllerKind(creator.Kind), } return podReplicaCreator, nil } func managingControllerRef(pod *apiv1.Pod) *metav1.OwnerReference { var managingController metav1.OwnerReference for _, ownerReference := range pod.ObjectMeta.GetOwnerReferences() { if *ownerReference.Controller { managingController = ownerReference break } } return &managingController } func setupInformer(kubeClient kube_client.Interface, kind controllerKind) (cache.SharedIndexInformer, error) { var informer cache.SharedIndexInformer switch kind { case replicationController: informer = coreinformer.NewReplicationControllerInformer(kubeClient, apiv1.NamespaceAll, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) case replicaSet: informer = appsinformer.NewReplicaSetInformer(kubeClient, apiv1.NamespaceAll, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) case statefulSet: informer = appsinformer.NewStatefulSetInformer(kubeClient, apiv1.NamespaceAll, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) case daemonSet: informer = appsinformer.NewDaemonSetInformer(kubeClient, apiv1.NamespaceAll, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) default: return nil, fmt.Errorf("Unknown controller kind: %v", kind) } stopCh := make(chan struct{}) go informer.Run(stopCh) synced := cache.WaitForCacheSync(stopCh, informer.HasSynced) if !synced { return nil, fmt.Errorf("Failed to sync %v cache.", kind) } return informer, nil } type singleGroupStats struct { configured int pending int running int evictionTolerance int evicted int inPlaceUpdateOngoing int // number of pods from last loop that are still in-place updating inPlaceUpdateInitiated int // number of pods from the current loop that have newly requested in-place resize } // isPodDisruptable checks if all pods are running and eviction tolerance is small, we can // disrupt the current pod. func (s *singleGroupStats) isPodDisruptable() bool { shouldBeAlive := s.configured - s.evictionTolerance actuallyAlive := s.running - (s.evicted + s.inPlaceUpdateInitiated) return actuallyAlive > shouldBeAlive || (s.configured == s.running && s.evictionTolerance == 0 && s.evicted == 0 && s.inPlaceUpdateInitiated == 0) // we don't want to block pods from being considered for eviction if tolerance is small and some pods are potentially stuck resizing } // isInPlaceUpdating checks whether or not the given pod is currently in the middle of an in-place update func isInPlaceUpdating(podToCheck *apiv1.Pod) bool { for _, c := range podToCheck.Status.Conditions { if c.Type == apiv1.PodResizePending || c.Type == apiv1.PodResizeInProgress { return c.Status == apiv1.ConditionTrue } } return false }