vertical-pod-autoscaler/pkg/updater/logic/updater.go (337 lines of code) (raw):
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logic
import (
"context"
"fmt"
"os"
"slices"
"time"
"golang.org/x/time/rate"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
utils "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/updater/utils"
corescheme "k8s.io/client-go/kubernetes/scheme"
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/scheme"
vpa_lister "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/listers/autoscaling.k8s.io/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/features"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/updater/priority"
restriction "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/updater/restriction"
metrics_updater "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/updater"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/status"
vpa_api_util "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa"
)
// Updater performs updates on pods if recommended by Vertical Pod Autoscaler
type Updater interface {
// RunOnce represents single iteration in the main-loop of Updater
RunOnce(context.Context)
}
type updater struct {
vpaLister vpa_lister.VerticalPodAutoscalerLister
podLister v1lister.PodLister
eventRecorder record.EventRecorder
restrictionFactory restriction.PodsRestrictionFactory
recommendationProcessor vpa_api_util.RecommendationProcessor
evictionAdmission priority.PodEvictionAdmission
priorityProcessor priority.PriorityProcessor
evictionRateLimiter *rate.Limiter
inPlaceRateLimiter *rate.Limiter
selectorFetcher target.VpaTargetSelectorFetcher
useAdmissionControllerStatus bool
statusValidator status.Validator
controllerFetcher controllerfetcher.ControllerFetcher
ignoredNamespaces []string
}
// NewUpdater creates Updater with given configuration
func NewUpdater(
kubeClient kube_client.Interface,
vpaClient *vpa_clientset.Clientset,
minReplicasForEviction int,
evictionRateLimit float64,
evictionRateBurst int,
evictionToleranceFraction float64,
useAdmissionControllerStatus bool,
statusNamespace string,
recommendationProcessor vpa_api_util.RecommendationProcessor,
evictionAdmission priority.PodEvictionAdmission,
selectorFetcher target.VpaTargetSelectorFetcher,
controllerFetcher controllerfetcher.ControllerFetcher,
priorityProcessor priority.PriorityProcessor,
namespace string,
ignoredNamespaces []string,
patchCalculators []patch.Calculator,
) (Updater, error) {
evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst)
// TODO: Create in-place rate limits for the in-place rate limiter
inPlaceRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst)
factory, err := restriction.NewPodsRestrictionFactory(
kubeClient,
minReplicasForEviction,
evictionToleranceFraction,
patchCalculators,
)
if err != nil {
return nil, fmt.Errorf("Failed to create restriction factory: %v", err)
}
return &updater{
vpaLister: vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), namespace),
podLister: newPodLister(kubeClient, namespace),
eventRecorder: newEventRecorder(kubeClient),
restrictionFactory: factory,
recommendationProcessor: recommendationProcessor,
evictionRateLimiter: evictionRateLimiter,
inPlaceRateLimiter: inPlaceRateLimiter,
evictionAdmission: evictionAdmission,
priorityProcessor: priorityProcessor,
selectorFetcher: selectorFetcher,
controllerFetcher: controllerFetcher,
useAdmissionControllerStatus: useAdmissionControllerStatus,
statusValidator: status.NewValidator(
kubeClient,
status.AdmissionControllerStatusName,
statusNamespace,
),
ignoredNamespaces: ignoredNamespaces,
}, nil
}
// RunOnce represents single iteration in the main-loop of Updater
func (u *updater) RunOnce(ctx context.Context) {
timer := metrics_updater.NewExecutionTimer()
defer timer.ObserveTotal()
if u.useAdmissionControllerStatus {
isValid, err := u.statusValidator.IsStatusValid(ctx, status.AdmissionControllerStatusTimeout)
if err != nil {
klog.ErrorS(err, "Error getting Admission Controller status. Skipping eviction loop")
return
}
if !isValid {
klog.V(0).InfoS("Admission Controller status is not valid. Skipping eviction loop", "timeout", status.AdmissionControllerStatusTimeout)
return
}
}
vpaList, err := u.vpaLister.List(labels.Everything())
if err != nil {
klog.ErrorS(err, "Failed to get VPA list")
os.Exit(255)
}
timer.ObserveStep("ListVPAs")
vpas := make([]*vpa_api_util.VpaWithSelector, 0)
for _, vpa := range vpaList {
if slices.Contains(u.ignoredNamespaces, vpa.Namespace) {
klog.V(3).InfoS("Skipping VPA object in ignored namespace", "vpa", klog.KObj(vpa), "namespace", vpa.Namespace)
continue
}
if vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeRecreate &&
vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeAuto && vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeInPlaceOrRecreate {
klog.V(3).InfoS("Skipping VPA object because its mode is not \"InPlaceOrRecreate\", \"Recreate\" or \"Auto\"", "vpa", klog.KObj(vpa))
continue
}
selector, err := u.selectorFetcher.Fetch(ctx, vpa)
if err != nil {
klog.V(3).InfoS("Skipping VPA object because we cannot fetch selector", "vpa", klog.KObj(vpa))
continue
}
vpas = append(vpas, &vpa_api_util.VpaWithSelector{
Vpa: vpa,
Selector: selector,
})
}
if len(vpas) == 0 {
klog.V(0).InfoS("No VPA objects to process")
if u.evictionAdmission != nil {
u.evictionAdmission.CleanUp()
}
return
}
podsList, err := u.podLister.List(labels.Everything())
if err != nil {
klog.ErrorS(err, "Failed to get pods list")
return
}
timer.ObserveStep("ListPods")
allLivePods := filterDeletedPods(podsList)
controlledPods := make(map[*vpa_types.VerticalPodAutoscaler][]*apiv1.Pod)
for _, pod := range allLivePods {
controllingVPA := vpa_api_util.GetControllingVPAForPod(ctx, pod, vpas, u.controllerFetcher)
if controllingVPA != nil {
controlledPods[controllingVPA.Vpa] = append(controlledPods[controllingVPA.Vpa], pod)
}
}
timer.ObserveStep("FilterPods")
if u.evictionAdmission != nil {
u.evictionAdmission.LoopInit(allLivePods, controlledPods)
}
timer.ObserveStep("AdmissionInit")
// wrappers for metrics which are computed every loop run
controlledPodsCounter := metrics_updater.NewControlledPodsCounter()
evictablePodsCounter := metrics_updater.NewEvictablePodsCounter()
inPlaceUpdatablePodsCounter := metrics_updater.NewInPlaceUpdtateablePodsCounter()
vpasWithEvictablePodsCounter := metrics_updater.NewVpasWithEvictablePodsCounter()
vpasWithEvictedPodsCounter := metrics_updater.NewVpasWithEvictedPodsCounter()
vpasWithInPlaceUpdatablePodsCounter := metrics_updater.NewVpasWithInPlaceUpdtateablePodsCounter()
vpasWithInPlaceUpdatedPodsCounter := metrics_updater.NewVpasWithInPlaceUpdtatedPodsCounter()
// using defer to protect against 'return' after evictionRateLimiter.Wait
defer controlledPodsCounter.Observe()
defer evictablePodsCounter.Observe()
defer vpasWithEvictablePodsCounter.Observe()
defer vpasWithEvictedPodsCounter.Observe()
// separate counters for in-place
defer inPlaceUpdatablePodsCounter.Observe()
defer vpasWithInPlaceUpdatablePodsCounter.Observe()
defer vpasWithInPlaceUpdatedPodsCounter.Observe()
// NOTE: this loop assumes that controlledPods are filtered
// to contain only Pods controlled by a VPA in auto, recreate, or inPlaceOrRecreate mode
for vpa, livePods := range controlledPods {
vpaSize := len(livePods)
controlledPodsCounter.Add(vpaSize, vpaSize)
creatorToSingleGroupStatsMap, podToReplicaCreatorMap, err := u.restrictionFactory.GetCreatorMaps(livePods, vpa)
if err != nil {
klog.ErrorS(err, "Failed to get creator maps")
continue
}
evictionLimiter := u.restrictionFactory.NewPodsEvictionRestriction(creatorToSingleGroupStatsMap, podToReplicaCreatorMap)
inPlaceLimiter := u.restrictionFactory.NewPodsInPlaceRestriction(creatorToSingleGroupStatsMap, podToReplicaCreatorMap)
podsForInPlace := make([]*apiv1.Pod, 0)
podsForEviction := make([]*apiv1.Pod, 0)
updateMode := vpa_api_util.GetUpdateMode(vpa)
if updateMode == vpa_types.UpdateModeInPlaceOrRecreate && features.Enabled(features.InPlaceOrRecreate) {
podsForInPlace = u.getPodsUpdateOrder(filterNonInPlaceUpdatablePods(livePods, inPlaceLimiter), vpa)
inPlaceUpdatablePodsCounter.Add(vpaSize, len(podsForInPlace))
} else {
// If the feature gate is not enabled but update mode is InPlaceOrRecreate, updater will always fallback to eviction.
if updateMode == vpa_types.UpdateModeInPlaceOrRecreate {
klog.InfoS("Warning: feature gate is not enabled for this updateMode", "featuregate", features.InPlaceOrRecreate, "updateMode", vpa_types.UpdateModeInPlaceOrRecreate)
}
podsForEviction = u.getPodsUpdateOrder(filterNonEvictablePods(livePods, evictionLimiter), vpa)
evictablePodsCounter.Add(vpaSize, len(podsForEviction))
}
withInPlaceUpdatable := false
withInPlaceUpdated := false
withEvictable := false
withEvicted := false
for _, pod := range podsForInPlace {
withInPlaceUpdatable = true
decision := inPlaceLimiter.CanInPlaceUpdate(pod)
if decision == utils.InPlaceDeferred {
klog.V(0).InfoS("In-place update deferred", "pod", klog.KObj(pod))
continue
} else if decision == utils.InPlaceEvict {
podsForEviction = append(podsForEviction, pod)
continue
}
err = u.inPlaceRateLimiter.Wait(ctx)
if err != nil {
klog.V(0).InfoS("In-place rate limiter wait failed for in-place resize", "error", err)
return
}
err := inPlaceLimiter.InPlaceUpdate(pod, vpa, u.eventRecorder)
if err != nil {
klog.V(0).InfoS("In-place update failed", "error", err, "pod", klog.KObj(pod))
continue
}
withInPlaceUpdated = true
metrics_updater.AddInPlaceUpdatedPod(vpaSize)
}
for _, pod := range podsForEviction {
withEvictable = true
if !evictionLimiter.CanEvict(pod) {
continue
}
err = u.evictionRateLimiter.Wait(ctx)
if err != nil {
klog.V(0).InfoS("Eviction rate limiter wait failed", "error", err)
return
}
klog.V(2).InfoS("Evicting pod", "pod", klog.KObj(pod))
evictErr := evictionLimiter.Evict(pod, vpa, u.eventRecorder)
if evictErr != nil {
klog.V(0).InfoS("Eviction failed", "error", evictErr, "pod", klog.KObj(pod))
} else {
withEvicted = true
metrics_updater.AddEvictedPod(vpaSize)
}
}
if withInPlaceUpdatable {
vpasWithInPlaceUpdatablePodsCounter.Add(vpaSize, 1)
}
if withInPlaceUpdated {
vpasWithInPlaceUpdatedPodsCounter.Add(vpaSize, 1)
}
if withEvictable {
vpasWithEvictablePodsCounter.Add(vpaSize, 1)
}
if withEvicted {
vpasWithEvictedPodsCounter.Add(vpaSize, 1)
}
}
timer.ObserveStep("EvictPods")
}
func getRateLimiter(rateLimit float64, rateLimitBurst int) *rate.Limiter {
var rateLimiter *rate.Limiter
if rateLimit <= 0 {
// As a special case if the rate is set to rate.Inf, the burst rate is ignored
// see https://github.com/golang/time/blob/master/rate/rate.go#L37
rateLimiter = rate.NewLimiter(rate.Inf, 0)
klog.V(1).InfoS("Rate limit disabled")
} else {
rateLimiter = rate.NewLimiter(rate.Limit(rateLimit), rateLimitBurst)
}
return rateLimiter
}
// getPodsUpdateOrder returns list of pods that should be updated ordered by update priority
func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*apiv1.Pod {
priorityCalculator := priority.NewUpdatePriorityCalculator(
vpa,
nil,
u.recommendationProcessor,
u.priorityProcessor)
for _, pod := range pods {
priorityCalculator.AddPod(pod, time.Now())
}
return priorityCalculator.GetSortedPods(u.evictionAdmission)
}
func filterPods(pods []*apiv1.Pod, predicate func(*apiv1.Pod) bool) []*apiv1.Pod {
result := make([]*apiv1.Pod, 0)
for _, pod := range pods {
if predicate(pod) {
result = append(result, pod)
}
}
return result
}
func filterNonInPlaceUpdatablePods(pods []*apiv1.Pod, inplaceRestriction restriction.PodsInPlaceRestriction) []*apiv1.Pod {
return filterPods(pods, func(pod *apiv1.Pod) bool {
return inplaceRestriction.CanInPlaceUpdate(pod) != utils.InPlaceDeferred
})
}
func filterNonEvictablePods(pods []*apiv1.Pod, evictionRestriction restriction.PodsEvictionRestriction) []*apiv1.Pod {
return filterPods(pods, evictionRestriction.CanEvict)
}
func filterDeletedPods(pods []*apiv1.Pod) []*apiv1.Pod {
return filterPods(pods, func(pod *apiv1.Pod) bool {
return pod.DeletionTimestamp == nil
})
}
func newPodLister(kubeClient kube_client.Interface, namespace string) v1lister.PodLister {
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podLister := v1lister.NewPodLister(store)
podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour)
stopCh := make(chan struct{})
go podReflector.Run(stopCh)
return podLister
}
func newEventRecorder(kubeClient kube_client.Interface) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(4)
if _, isFake := kubeClient.(*fake.Clientset); !isFake {
eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: clientv1.New(kubeClient.CoreV1().RESTClient()).Events("")})
} else {
eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
}
vpascheme := scheme.Scheme
if err := corescheme.AddToScheme(vpascheme); err != nil {
klog.ErrorS(err, "Error adding core scheme")
os.Exit(255)
}
return eventBroadcaster.NewRecorder(vpascheme, apiv1.EventSource{Component: "vpa-updater"})
}