vertical-pod-autoscaler/pkg/updater/logic/updater.go (337 lines of code) (raw):

/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package logic import ( "context" "fmt" "os" "slices" "time" "golang.org/x/time/rate" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" kube_client "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" utils "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/updater/utils" corescheme "k8s.io/client-go/kubernetes/scheme" clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" v1lister "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/scheme" vpa_lister "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/listers/autoscaling.k8s.io/v1" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/features" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target" controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/updater/priority" restriction "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/updater/restriction" metrics_updater "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/updater" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/status" vpa_api_util "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa" ) // Updater performs updates on pods if recommended by Vertical Pod Autoscaler type Updater interface { // RunOnce represents single iteration in the main-loop of Updater RunOnce(context.Context) } type updater struct { vpaLister vpa_lister.VerticalPodAutoscalerLister podLister v1lister.PodLister eventRecorder record.EventRecorder restrictionFactory restriction.PodsRestrictionFactory recommendationProcessor vpa_api_util.RecommendationProcessor evictionAdmission priority.PodEvictionAdmission priorityProcessor priority.PriorityProcessor evictionRateLimiter *rate.Limiter inPlaceRateLimiter *rate.Limiter selectorFetcher target.VpaTargetSelectorFetcher useAdmissionControllerStatus bool statusValidator status.Validator controllerFetcher controllerfetcher.ControllerFetcher ignoredNamespaces []string } // NewUpdater creates Updater with given configuration func NewUpdater( kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, minReplicasForEviction int, evictionRateLimit float64, evictionRateBurst int, evictionToleranceFraction float64, useAdmissionControllerStatus bool, statusNamespace string, recommendationProcessor vpa_api_util.RecommendationProcessor, evictionAdmission priority.PodEvictionAdmission, selectorFetcher target.VpaTargetSelectorFetcher, controllerFetcher controllerfetcher.ControllerFetcher, priorityProcessor priority.PriorityProcessor, namespace string, ignoredNamespaces []string, patchCalculators []patch.Calculator, ) (Updater, error) { evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst) // TODO: Create in-place rate limits for the in-place rate limiter inPlaceRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst) factory, err := restriction.NewPodsRestrictionFactory( kubeClient, minReplicasForEviction, evictionToleranceFraction, patchCalculators, ) if err != nil { return nil, fmt.Errorf("Failed to create restriction factory: %v", err) } return &updater{ vpaLister: vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), namespace), podLister: newPodLister(kubeClient, namespace), eventRecorder: newEventRecorder(kubeClient), restrictionFactory: factory, recommendationProcessor: recommendationProcessor, evictionRateLimiter: evictionRateLimiter, inPlaceRateLimiter: inPlaceRateLimiter, evictionAdmission: evictionAdmission, priorityProcessor: priorityProcessor, selectorFetcher: selectorFetcher, controllerFetcher: controllerFetcher, useAdmissionControllerStatus: useAdmissionControllerStatus, statusValidator: status.NewValidator( kubeClient, status.AdmissionControllerStatusName, statusNamespace, ), ignoredNamespaces: ignoredNamespaces, }, nil } // RunOnce represents single iteration in the main-loop of Updater func (u *updater) RunOnce(ctx context.Context) { timer := metrics_updater.NewExecutionTimer() defer timer.ObserveTotal() if u.useAdmissionControllerStatus { isValid, err := u.statusValidator.IsStatusValid(ctx, status.AdmissionControllerStatusTimeout) if err != nil { klog.ErrorS(err, "Error getting Admission Controller status. Skipping eviction loop") return } if !isValid { klog.V(0).InfoS("Admission Controller status is not valid. Skipping eviction loop", "timeout", status.AdmissionControllerStatusTimeout) return } } vpaList, err := u.vpaLister.List(labels.Everything()) if err != nil { klog.ErrorS(err, "Failed to get VPA list") os.Exit(255) } timer.ObserveStep("ListVPAs") vpas := make([]*vpa_api_util.VpaWithSelector, 0) for _, vpa := range vpaList { if slices.Contains(u.ignoredNamespaces, vpa.Namespace) { klog.V(3).InfoS("Skipping VPA object in ignored namespace", "vpa", klog.KObj(vpa), "namespace", vpa.Namespace) continue } if vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeRecreate && vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeAuto && vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeInPlaceOrRecreate { klog.V(3).InfoS("Skipping VPA object because its mode is not \"InPlaceOrRecreate\", \"Recreate\" or \"Auto\"", "vpa", klog.KObj(vpa)) continue } selector, err := u.selectorFetcher.Fetch(ctx, vpa) if err != nil { klog.V(3).InfoS("Skipping VPA object because we cannot fetch selector", "vpa", klog.KObj(vpa)) continue } vpas = append(vpas, &vpa_api_util.VpaWithSelector{ Vpa: vpa, Selector: selector, }) } if len(vpas) == 0 { klog.V(0).InfoS("No VPA objects to process") if u.evictionAdmission != nil { u.evictionAdmission.CleanUp() } return } podsList, err := u.podLister.List(labels.Everything()) if err != nil { klog.ErrorS(err, "Failed to get pods list") return } timer.ObserveStep("ListPods") allLivePods := filterDeletedPods(podsList) controlledPods := make(map[*vpa_types.VerticalPodAutoscaler][]*apiv1.Pod) for _, pod := range allLivePods { controllingVPA := vpa_api_util.GetControllingVPAForPod(ctx, pod, vpas, u.controllerFetcher) if controllingVPA != nil { controlledPods[controllingVPA.Vpa] = append(controlledPods[controllingVPA.Vpa], pod) } } timer.ObserveStep("FilterPods") if u.evictionAdmission != nil { u.evictionAdmission.LoopInit(allLivePods, controlledPods) } timer.ObserveStep("AdmissionInit") // wrappers for metrics which are computed every loop run controlledPodsCounter := metrics_updater.NewControlledPodsCounter() evictablePodsCounter := metrics_updater.NewEvictablePodsCounter() inPlaceUpdatablePodsCounter := metrics_updater.NewInPlaceUpdtateablePodsCounter() vpasWithEvictablePodsCounter := metrics_updater.NewVpasWithEvictablePodsCounter() vpasWithEvictedPodsCounter := metrics_updater.NewVpasWithEvictedPodsCounter() vpasWithInPlaceUpdatablePodsCounter := metrics_updater.NewVpasWithInPlaceUpdtateablePodsCounter() vpasWithInPlaceUpdatedPodsCounter := metrics_updater.NewVpasWithInPlaceUpdtatedPodsCounter() // using defer to protect against 'return' after evictionRateLimiter.Wait defer controlledPodsCounter.Observe() defer evictablePodsCounter.Observe() defer vpasWithEvictablePodsCounter.Observe() defer vpasWithEvictedPodsCounter.Observe() // separate counters for in-place defer inPlaceUpdatablePodsCounter.Observe() defer vpasWithInPlaceUpdatablePodsCounter.Observe() defer vpasWithInPlaceUpdatedPodsCounter.Observe() // NOTE: this loop assumes that controlledPods are filtered // to contain only Pods controlled by a VPA in auto, recreate, or inPlaceOrRecreate mode for vpa, livePods := range controlledPods { vpaSize := len(livePods) controlledPodsCounter.Add(vpaSize, vpaSize) creatorToSingleGroupStatsMap, podToReplicaCreatorMap, err := u.restrictionFactory.GetCreatorMaps(livePods, vpa) if err != nil { klog.ErrorS(err, "Failed to get creator maps") continue } evictionLimiter := u.restrictionFactory.NewPodsEvictionRestriction(creatorToSingleGroupStatsMap, podToReplicaCreatorMap) inPlaceLimiter := u.restrictionFactory.NewPodsInPlaceRestriction(creatorToSingleGroupStatsMap, podToReplicaCreatorMap) podsForInPlace := make([]*apiv1.Pod, 0) podsForEviction := make([]*apiv1.Pod, 0) updateMode := vpa_api_util.GetUpdateMode(vpa) if updateMode == vpa_types.UpdateModeInPlaceOrRecreate && features.Enabled(features.InPlaceOrRecreate) { podsForInPlace = u.getPodsUpdateOrder(filterNonInPlaceUpdatablePods(livePods, inPlaceLimiter), vpa) inPlaceUpdatablePodsCounter.Add(vpaSize, len(podsForInPlace)) } else { // If the feature gate is not enabled but update mode is InPlaceOrRecreate, updater will always fallback to eviction. if updateMode == vpa_types.UpdateModeInPlaceOrRecreate { klog.InfoS("Warning: feature gate is not enabled for this updateMode", "featuregate", features.InPlaceOrRecreate, "updateMode", vpa_types.UpdateModeInPlaceOrRecreate) } podsForEviction = u.getPodsUpdateOrder(filterNonEvictablePods(livePods, evictionLimiter), vpa) evictablePodsCounter.Add(vpaSize, len(podsForEviction)) } withInPlaceUpdatable := false withInPlaceUpdated := false withEvictable := false withEvicted := false for _, pod := range podsForInPlace { withInPlaceUpdatable = true decision := inPlaceLimiter.CanInPlaceUpdate(pod) if decision == utils.InPlaceDeferred { klog.V(0).InfoS("In-place update deferred", "pod", klog.KObj(pod)) continue } else if decision == utils.InPlaceEvict { podsForEviction = append(podsForEviction, pod) continue } err = u.inPlaceRateLimiter.Wait(ctx) if err != nil { klog.V(0).InfoS("In-place rate limiter wait failed for in-place resize", "error", err) return } err := inPlaceLimiter.InPlaceUpdate(pod, vpa, u.eventRecorder) if err != nil { klog.V(0).InfoS("In-place update failed", "error", err, "pod", klog.KObj(pod)) continue } withInPlaceUpdated = true metrics_updater.AddInPlaceUpdatedPod(vpaSize) } for _, pod := range podsForEviction { withEvictable = true if !evictionLimiter.CanEvict(pod) { continue } err = u.evictionRateLimiter.Wait(ctx) if err != nil { klog.V(0).InfoS("Eviction rate limiter wait failed", "error", err) return } klog.V(2).InfoS("Evicting pod", "pod", klog.KObj(pod)) evictErr := evictionLimiter.Evict(pod, vpa, u.eventRecorder) if evictErr != nil { klog.V(0).InfoS("Eviction failed", "error", evictErr, "pod", klog.KObj(pod)) } else { withEvicted = true metrics_updater.AddEvictedPod(vpaSize) } } if withInPlaceUpdatable { vpasWithInPlaceUpdatablePodsCounter.Add(vpaSize, 1) } if withInPlaceUpdated { vpasWithInPlaceUpdatedPodsCounter.Add(vpaSize, 1) } if withEvictable { vpasWithEvictablePodsCounter.Add(vpaSize, 1) } if withEvicted { vpasWithEvictedPodsCounter.Add(vpaSize, 1) } } timer.ObserveStep("EvictPods") } func getRateLimiter(rateLimit float64, rateLimitBurst int) *rate.Limiter { var rateLimiter *rate.Limiter if rateLimit <= 0 { // As a special case if the rate is set to rate.Inf, the burst rate is ignored // see https://github.com/golang/time/blob/master/rate/rate.go#L37 rateLimiter = rate.NewLimiter(rate.Inf, 0) klog.V(1).InfoS("Rate limit disabled") } else { rateLimiter = rate.NewLimiter(rate.Limit(rateLimit), rateLimitBurst) } return rateLimiter } // getPodsUpdateOrder returns list of pods that should be updated ordered by update priority func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*apiv1.Pod { priorityCalculator := priority.NewUpdatePriorityCalculator( vpa, nil, u.recommendationProcessor, u.priorityProcessor) for _, pod := range pods { priorityCalculator.AddPod(pod, time.Now()) } return priorityCalculator.GetSortedPods(u.evictionAdmission) } func filterPods(pods []*apiv1.Pod, predicate func(*apiv1.Pod) bool) []*apiv1.Pod { result := make([]*apiv1.Pod, 0) for _, pod := range pods { if predicate(pod) { result = append(result, pod) } } return result } func filterNonInPlaceUpdatablePods(pods []*apiv1.Pod, inplaceRestriction restriction.PodsInPlaceRestriction) []*apiv1.Pod { return filterPods(pods, func(pod *apiv1.Pod) bool { return inplaceRestriction.CanInPlaceUpdate(pod) != utils.InPlaceDeferred }) } func filterNonEvictablePods(pods []*apiv1.Pod, evictionRestriction restriction.PodsEvictionRestriction) []*apiv1.Pod { return filterPods(pods, evictionRestriction.CanEvict) } func filterDeletedPods(pods []*apiv1.Pod) []*apiv1.Pod { return filterPods(pods, func(pod *apiv1.Pod) bool { return pod.DeletionTimestamp == nil }) } func newPodLister(kubeClient kube_client.Interface, namespace string) v1lister.PodLister { selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector) store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) podLister := v1lister.NewPodLister(store) podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour) stopCh := make(chan struct{}) go podReflector.Run(stopCh) return podLister } func newEventRecorder(kubeClient kube_client.Interface) record.EventRecorder { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartStructuredLogging(4) if _, isFake := kubeClient.(*fake.Clientset); !isFake { eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: clientv1.New(kubeClient.CoreV1().RESTClient()).Events("")}) } else { eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) } vpascheme := scheme.Scheme if err := corescheme.AddToScheme(vpascheme); err != nil { klog.ErrorS(err, "Error adding core scheme") os.Exit(255) } return eventBroadcaster.NewRecorder(vpascheme, apiv1.EventSource{Component: "vpa-updater"}) }