vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction.go (320 lines of code) (raw):
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eviction
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
appsinformer "k8s.io/client-go/informers/apps/v1"
coreinformer "k8s.io/client-go/informers/core/v1"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
)
const (
resyncPeriod time.Duration = 1 * time.Minute
)
// PodsEvictionRestriction controls pods evictions. It ensures that we will not evict too
// many pods from one replica set. For replica set will allow to evict one pod or more if
// evictionToleranceFraction is configured.
type PodsEvictionRestriction interface {
// Evict sends eviction instruction to the api client.
// Returns error if pod cannot be evicted or if client returned error.
Evict(pod *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error
// CanEvict checks if pod can be safely evicted
CanEvict(pod *apiv1.Pod) bool
}
type podsEvictionRestrictionImpl struct {
client kube_client.Interface
podToReplicaCreatorMap map[string]podReplicaCreator
creatorToSingleGroupStatsMap map[podReplicaCreator]singleGroupStats
}
type singleGroupStats struct {
configured int
pending int
running int
evictionTolerance int
evicted int
}
// PodsEvictionRestrictionFactory creates PodsEvictionRestriction
type PodsEvictionRestrictionFactory interface {
// NewPodsEvictionRestriction creates PodsEvictionRestriction for given set of pods,
// controlled by a single VPA object.
NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) PodsEvictionRestriction
}
type podsEvictionRestrictionFactoryImpl struct {
client kube_client.Interface
rcInformer cache.SharedIndexInformer // informer for Replication Controllers
ssInformer cache.SharedIndexInformer // informer for Stateful Sets
rsInformer cache.SharedIndexInformer // informer for Replica Sets
dsInformer cache.SharedIndexInformer // informer for Daemon Sets
minReplicas int
evictionToleranceFraction float64
}
type controllerKind string
const (
replicationController controllerKind = "ReplicationController"
statefulSet controllerKind = "StatefulSet"
replicaSet controllerKind = "ReplicaSet"
daemonSet controllerKind = "DaemonSet"
job controllerKind = "Job"
)
type podReplicaCreator struct {
Namespace string
Name string
Kind controllerKind
}
// CanEvict checks if pod can be safely evicted
func (e *podsEvictionRestrictionImpl) CanEvict(pod *apiv1.Pod) bool {
cr, present := e.podToReplicaCreatorMap[getPodID(pod)]
if present {
singleGroupStats, present := e.creatorToSingleGroupStatsMap[cr]
if pod.Status.Phase == apiv1.PodPending {
return true
}
if present {
shouldBeAlive := singleGroupStats.configured - singleGroupStats.evictionTolerance
if singleGroupStats.running-singleGroupStats.evicted > shouldBeAlive {
return true
}
// If all pods are running and eviction tolerance is small evict 1 pod.
if singleGroupStats.running == singleGroupStats.configured &&
singleGroupStats.evictionTolerance == 0 &&
singleGroupStats.evicted == 0 {
return true
}
}
}
return false
}
// Evict sends eviction instruction to api client. Returns error if pod cannot be evicted or if client returned error
// Does not check if pod was actually evicted after eviction grace period.
func (e *podsEvictionRestrictionImpl) Evict(podToEvict *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error {
cr, present := e.podToReplicaCreatorMap[getPodID(podToEvict)]
if !present {
return fmt.Errorf("pod not suitable for eviction %s/%s: not in replicated pods map", podToEvict.Namespace, podToEvict.Name)
}
if !e.CanEvict(podToEvict) {
return fmt.Errorf("cannot evict pod %s/%s: eviction budget exceeded", podToEvict.Namespace, podToEvict.Name)
}
eviction := &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Namespace: podToEvict.Namespace,
Name: podToEvict.Name,
},
}
err := e.client.CoreV1().Pods(podToEvict.Namespace).EvictV1(context.TODO(), eviction)
if err != nil {
klog.ErrorS(err, "Failed to evict pod", "pod", klog.KObj(podToEvict))
return err
}
eventRecorder.Event(podToEvict, apiv1.EventTypeNormal, "EvictedByVPA",
"Pod was evicted by VPA Updater to apply resource recommendation.")
eventRecorder.Event(vpa, apiv1.EventTypeNormal, "EvictedPod",
"VPA Updater evicted Pod "+podToEvict.Name+" to apply resource recommendation.")
if podToEvict.Status.Phase != apiv1.PodPending {
singleGroupStats, present := e.creatorToSingleGroupStatsMap[cr]
if !present {
return fmt.Errorf("Internal error - cannot find stats for replication group %v", cr)
}
singleGroupStats.evicted = singleGroupStats.evicted + 1
e.creatorToSingleGroupStatsMap[cr] = singleGroupStats
}
return nil
}
// NewPodsEvictionRestrictionFactory creates PodsEvictionRestrictionFactory
func NewPodsEvictionRestrictionFactory(client kube_client.Interface, minReplicas int,
evictionToleranceFraction float64) (PodsEvictionRestrictionFactory, error) {
rcInformer, err := setUpInformer(client, replicationController)
if err != nil {
return nil, fmt.Errorf("Failed to create rcInformer: %v", err)
}
ssInformer, err := setUpInformer(client, statefulSet)
if err != nil {
return nil, fmt.Errorf("Failed to create ssInformer: %v", err)
}
rsInformer, err := setUpInformer(client, replicaSet)
if err != nil {
return nil, fmt.Errorf("Failed to create rsInformer: %v", err)
}
dsInformer, err := setUpInformer(client, daemonSet)
if err != nil {
return nil, fmt.Errorf("Failed to create dsInformer: %v", err)
}
return &podsEvictionRestrictionFactoryImpl{
client: client,
rcInformer: rcInformer, // informer for Replication Controllers
ssInformer: ssInformer, // informer for Replica Sets
rsInformer: rsInformer, // informer for Stateful Sets
dsInformer: dsInformer, // informer for Daemon Sets
minReplicas: minReplicas,
evictionToleranceFraction: evictionToleranceFraction}, nil
}
// NewPodsEvictionRestriction creates PodsEvictionRestriction for a given set of pods,
// controlled by a single VPA object.
func (f *podsEvictionRestrictionFactoryImpl) NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) PodsEvictionRestriction {
// We can evict pod only if it is a part of replica set
// For each replica set we can evict only a fraction of pods.
// Evictions may be later limited by pod disruption budget if configured.
livePods := make(map[podReplicaCreator][]*apiv1.Pod)
for _, pod := range pods {
creator, err := getPodReplicaCreator(pod)
if err != nil {
klog.ErrorS(err, "Failed to obtain replication info for pod", "pod", klog.KObj(pod))
continue
}
if creator == nil {
klog.V(0).InfoS("Pod is not managed by any controller", "pod", klog.KObj(pod))
continue
}
livePods[*creator] = append(livePods[*creator], pod)
}
podToReplicaCreatorMap := make(map[string]podReplicaCreator)
creatorToSingleGroupStatsMap := make(map[podReplicaCreator]singleGroupStats)
// Use per-VPA minReplicas if present, fall back to the global setting.
required := f.minReplicas
if vpa.Spec.UpdatePolicy != nil && vpa.Spec.UpdatePolicy.MinReplicas != nil {
required = int(*vpa.Spec.UpdatePolicy.MinReplicas)
klog.V(3).InfoS("Overriding minReplicas from global to per-VPA value", "globalMinReplicas", f.minReplicas, "vpaMinReplicas", required, "vpa", klog.KObj(vpa))
}
for creator, replicas := range livePods {
actual := len(replicas)
if actual < required {
klog.V(2).InfoS("Too few replicas", "kind", creator.Kind, "object", klog.KRef(creator.Namespace, creator.Name), "livePods", actual, "requiredPods", required, "globalMinReplicas", f.minReplicas)
continue
}
var configured int
if creator.Kind == job {
// Job has no replicas configuration, so we will use actual number of live pods as replicas count.
configured = actual
} else {
var err error
configured, err = f.getReplicaCount(creator)
if err != nil {
klog.ErrorS(err, "Failed to obtain replication info", "kind", creator.Kind, "object", klog.KRef(creator.Namespace, creator.Name))
continue
}
}
singleGroup := singleGroupStats{}
singleGroup.configured = configured
singleGroup.evictionTolerance = int(float64(configured) * f.evictionToleranceFraction)
for _, pod := range replicas {
podToReplicaCreatorMap[getPodID(pod)] = creator
if pod.Status.Phase == apiv1.PodPending {
singleGroup.pending = singleGroup.pending + 1
}
}
singleGroup.running = len(replicas) - singleGroup.pending
creatorToSingleGroupStatsMap[creator] = singleGroup
}
return &podsEvictionRestrictionImpl{
client: f.client,
podToReplicaCreatorMap: podToReplicaCreatorMap,
creatorToSingleGroupStatsMap: creatorToSingleGroupStatsMap}
}
func getPodReplicaCreator(pod *apiv1.Pod) (*podReplicaCreator, error) {
creator := managingControllerRef(pod)
if creator == nil {
return nil, nil
}
podReplicaCreator := &podReplicaCreator{
Namespace: pod.Namespace,
Name: creator.Name,
Kind: controllerKind(creator.Kind),
}
return podReplicaCreator, nil
}
func getPodID(pod *apiv1.Pod) string {
if pod == nil {
return ""
}
return pod.Namespace + "/" + pod.Name
}
func (f *podsEvictionRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator) (int, error) {
switch creator.Kind {
case replicationController:
rcObj, exists, err := f.rcInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
if err != nil {
return 0, fmt.Errorf("replication controller %s/%s is not available, err: %v", creator.Namespace, creator.Name, err)
}
if !exists {
return 0, fmt.Errorf("replication controller %s/%s does not exist", creator.Namespace, creator.Name)
}
rc, ok := rcObj.(*apiv1.ReplicationController)
if !ok {
return 0, fmt.Errorf("Failed to parse Replication Controller")
}
if rc.Spec.Replicas == nil || *rc.Spec.Replicas == 0 {
return 0, fmt.Errorf("replication controller %s/%s has no replicas config", creator.Namespace, creator.Name)
}
return int(*rc.Spec.Replicas), nil
case replicaSet:
rsObj, exists, err := f.rsInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
if err != nil {
return 0, fmt.Errorf("replica set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err)
}
if !exists {
return 0, fmt.Errorf("replica set %s/%s does not exist", creator.Namespace, creator.Name)
}
rs, ok := rsObj.(*appsv1.ReplicaSet)
if !ok {
return 0, fmt.Errorf("Failed to parse Replicaset")
}
if rs.Spec.Replicas == nil || *rs.Spec.Replicas == 0 {
return 0, fmt.Errorf("replica set %s/%s has no replicas config", creator.Namespace, creator.Name)
}
return int(*rs.Spec.Replicas), nil
case statefulSet:
ssObj, exists, err := f.ssInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
if err != nil {
return 0, fmt.Errorf("stateful set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err)
}
if !exists {
return 0, fmt.Errorf("stateful set %s/%s does not exist", creator.Namespace, creator.Name)
}
ss, ok := ssObj.(*appsv1.StatefulSet)
if !ok {
return 0, fmt.Errorf("Failed to parse StatefulSet")
}
if ss.Spec.Replicas == nil || *ss.Spec.Replicas == 0 {
return 0, fmt.Errorf("stateful set %s/%s has no replicas config", creator.Namespace, creator.Name)
}
return int(*ss.Spec.Replicas), nil
case daemonSet:
dsObj, exists, err := f.dsInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
if err != nil {
return 0, fmt.Errorf("daemon set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err)
}
if !exists {
return 0, fmt.Errorf("daemon set %s/%s does not exist", creator.Namespace, creator.Name)
}
ds, ok := dsObj.(*appsv1.DaemonSet)
if !ok {
return 0, fmt.Errorf("Failed to parse DaemonSet")
}
if ds.Status.NumberReady == 0 {
return 0, fmt.Errorf("daemon set %s/%s has no number ready pods", creator.Namespace, creator.Name)
}
return int(ds.Status.NumberReady), nil
}
return 0, nil
}
func managingControllerRef(pod *apiv1.Pod) *metav1.OwnerReference {
var managingController metav1.OwnerReference
for _, ownerReference := range pod.ObjectMeta.GetOwnerReferences() {
if *ownerReference.Controller {
managingController = ownerReference
break
}
}
return &managingController
}
func setUpInformer(kubeClient kube_client.Interface, kind controllerKind) (cache.SharedIndexInformer, error) {
var informer cache.SharedIndexInformer
switch kind {
case replicationController:
informer = coreinformer.NewReplicationControllerInformer(kubeClient, apiv1.NamespaceAll,
resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
case replicaSet:
informer = appsinformer.NewReplicaSetInformer(kubeClient, apiv1.NamespaceAll,
resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
case statefulSet:
informer = appsinformer.NewStatefulSetInformer(kubeClient, apiv1.NamespaceAll,
resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
case daemonSet:
informer = appsinformer.NewDaemonSetInformer(kubeClient, apiv1.NamespaceAll,
resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
default:
return nil, fmt.Errorf("Unknown controller kind: %v", kind)
}
stopCh := make(chan struct{})
go informer.Run(stopCh)
synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
if !synced {
return nil, fmt.Errorf("Failed to sync %v cache.", kind)
}
return informer, nil
}