internal/controller/evictionautoscaler_controller.go (165 lines of code) (raw):

package controllers import ( "context" "fmt" "math" "strconv" "strings" "time" myappsv1 "github.com/azure/eviction-autoscaler/api/v1" //v1 "k8s.io/api/apps/v1" policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" ) const EvictionSurgeReplicasAnnotationKey = "evictionSurgeReplicas" // EvictionAutoScalerReconciler reconciles a EvictionAutoScaler object type EvictionAutoScalerReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder } const cooldown = 1 * time.Minute // +kubebuilder:rbac:groups=eviction-autoscaler.azure.com,resources=evictionautoscalers,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=eviction-autoscaler.azure.com,resources=evictionautoscalers/status,verbs=get;update;patch // +kubebuilder:rbac:groups=eviction-autoscaler.azure.com,resources=evictionautoscalers/finalizers,verbs=update // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=watch;get;list;update // +kubebuilder:rbac:groups=core,resources=pods,verbs=watch;get;list // +kubebuilder:rbac:groups=core,resources=pods/status,verbs=update func (r *EvictionAutoScalerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) // Fetch the EvictionAutoScaler instance EvictionAutoScaler := &myappsv1.EvictionAutoScaler{} err := r.Get(ctx, req.NamespacedName, EvictionAutoScaler) if err != nil { //should we use a finalizer to scale back down on deletion? if errors.IsNotFound(err) { return ctrl.Result{}, nil // EvictionAutoScaler not found, could be deleted, nothing to do } return ctrl.Result{}, err // Error fetching EvictionAutoScaler } EvictionAutoScaler = EvictionAutoScaler.DeepCopy() //don't mutate the cache // Fetch the PDB using a 1:1 name mapping pdb := &policyv1.PodDisruptionBudget{} err = r.Get(ctx, types.NamespacedName{Name: EvictionAutoScaler.Name, Namespace: EvictionAutoScaler.Namespace}, pdb) if err != nil { if errors.IsNotFound(err) { degraded(&EvictionAutoScaler.Status.Conditions, "NoPdb", "PDB of same name not found") logger.Error(err, "no matching pdb", "namespace", EvictionAutoScaler.Namespace, "name", EvictionAutoScaler.Name) return ctrl.Result{}, r.Status().Update(ctx, EvictionAutoScaler) } return ctrl.Result{}, err } if EvictionAutoScaler.Spec.TargetName == "" { degraded(&EvictionAutoScaler.Status.Conditions, "EmptyTarget", "no specified target") logger.Error(err, "no specified target name", "targetname", EvictionAutoScaler.Spec.TargetName) return ctrl.Result{}, r.Status().Update(ctx, EvictionAutoScaler) } // Fetch the Deployment or Statefulset // TODO enum validation https://book.kubebuilder.io/reference/generating-crd#validation target, err := GetSurger(EvictionAutoScaler.Spec.TargetKind) if err != nil { logger.Error(err, "invalid target kind", "kind", EvictionAutoScaler.Spec.TargetKind) degraded(&EvictionAutoScaler.Status.Conditions, "InvalidTarget", "Invalid Target Kind: "+EvictionAutoScaler.Spec.TargetKind) return ctrl.Result{}, r.Status().Update(ctx, EvictionAutoScaler) } err = r.Get(ctx, types.NamespacedName{Name: EvictionAutoScaler.Spec.TargetName, Namespace: EvictionAutoScaler.Namespace}, target.Obj()) if err != nil { if errors.IsNotFound(err) { logger.Error(err, "pdb watcher target does not exist", "kind", EvictionAutoScaler.Spec.TargetKind, "targetname", EvictionAutoScaler.Spec.TargetName) degraded(&EvictionAutoScaler.Status.Conditions, "MissingTarget", "Misssing Target "+EvictionAutoScaler.Spec.TargetName) return ctrl.Result{}, r.Status().Update(ctx, EvictionAutoScaler) } return ctrl.Result{}, err } // Check if the resource version has changed or if it's empty (initial state) if EvictionAutoScaler.Status.TargetGeneration == 0 || EvictionAutoScaler.Status.TargetGeneration != target.Obj().GetGeneration() { logger.Info("Target resource version changed reseting min replicas", "kind", EvictionAutoScaler.Spec.TargetKind, "targetname", EvictionAutoScaler.Spec.TargetName) // The resource version has changed, which means someone else has modified the Target. // To avoid conflicts, we update our status to reflect the new state and avoid making further changes. EvictionAutoScaler.Status.TargetGeneration = target.Obj().GetGeneration() EvictionAutoScaler.Status.MinReplicas = target.GetReplicas() ready(&EvictionAutoScaler.Status.Conditions, "TargetSpecChange", fmt.Sprintf("resetting min replicas to %d", EvictionAutoScaler.Status.MinReplicas)) return ctrl.Result{}, r.Status().Update(ctx, EvictionAutoScaler) //should we go rety in case there is also an eviction or just wait till the next eviction } // Log current state before checks logger.Info(fmt.Sprintf("Checking PDB for %s: DisruptionsAllowed=%d, MinReplicas=%d", pdb.Name, pdb.Status.DisruptionsAllowed, EvictionAutoScaler.Status.MinReplicas)) // Have we processed all evictions okay don't do anything else if EvictionAutoScaler.Spec.LastEviction == EvictionAutoScaler.Status.LastEviction { logger.Info("No unhandled eviction ", "pdbname", pdb.Name) ready(&EvictionAutoScaler.Status.Conditions, "Reconciled", "no unhandled eviction") return ctrl.Result{}, r.Status().Update(ctx, EvictionAutoScaler) } //if we're not scaled up and theres new evictions we haven't proceesed if pdb.Status.DisruptionsAllowed == 0 && target.GetReplicas() == EvictionAutoScaler.Status.MinReplicas { //What if the evict went through because the pod being evicted wasn't ready anyways? Handle that in webhook or here? // TODO later. Surge more slowly based on number of evitions (need to move back to capturing them all) logger.Info(fmt.Sprintf("No disruptions allowed for %s and recent eviction attempting to scale up", pdb.Name)) newReplicas := calculateSurge(ctx, target, EvictionAutoScaler.Status.MinReplicas) target.SetReplicas(newReplicas) //adding annotations here is an atomic operation; //EvictionAutoScaler can fail between updating deployment and EvictionAutoScaler targetGeneration; //hence we need to rely on checking if annotation exists and compare with deployment.Spec.Replicas // this is to solve customer scaling up deployment manually so EvictionAutoScaler minAvailable needs to be updated target.AddAnnotation(EvictionSurgeReplicasAnnotationKey, strconv.FormatInt(int64(newReplicas), 10)) err = r.Update(ctx, target.Obj()) if err != nil { logger.Error(err, "failed to update Target", "kind", EvictionAutoScaler.Spec.TargetKind, "targetname", EvictionAutoScaler.Spec.TargetName) return ctrl.Result{}, err } // Log the scaling action logger.Info(fmt.Sprintf("Scaled up %s %s/%s to %d replicas", EvictionAutoScaler.Spec.TargetKind, target.Obj().GetNamespace(), target.Obj().GetName(), newReplicas)) // Save ResourceVersion to EvictionAutoScaler status this will cause another reconcile. EvictionAutoScaler.Status.TargetGeneration = target.Obj().GetGeneration() //EvictionAutoScaler.Status.LastEviction = EvictionAutoScaler.Spec.LastEviction //we could still keep a log here if thats useful ready(&EvictionAutoScaler.Status.Conditions, "Reconciled", "eviction with scale up") return ctrl.Result{RequeueAfter: cooldown}, r.Status().Update(ctx, EvictionAutoScaler) } //what if we're allowed disruptions >0 and minreplicas == replicas? Could argue that we should mark the eviction as handled //BUT maybe PDB is slow to update? so just letting it requeue anyways //Cool down time makes sure we're not still getting more evictions //we could substantially reduce this if we looked at pods and knew that none remaining (not already evicted) had been an eviction target but that means tracking more data in EvictionAutoScaler // or using pod conditons which we're not doing.....yet if time.Since(EvictionAutoScaler.Spec.LastEviction.EvictionTime.Time) < cooldown { logger.Info(fmt.Sprintf("Giving %s/%s cooldown of %s after last eviction %s ", target.Obj().GetNamespace(), target.Obj().GetName(), cooldown, EvictionAutoScaler.Spec.LastEviction.EvictionTime)) return ctrl.Result{RequeueAfter: cooldown}, nil } //still at a scaled out state check if we can scale back down if target.GetReplicas() > EvictionAutoScaler.Status.MinReplicas { //would we ever be below min replicas //okay we aren't at allowed disruptions Revert Target to the original state target.SetReplicas(EvictionAutoScaler.Status.MinReplicas) target.RemoveAnnotation(EvictionSurgeReplicasAnnotationKey) err = r.Update(ctx, target.Obj()) if err != nil { return ctrl.Result{}, err } // Log the scaling action logger.Info(fmt.Sprintf("Reverted %s %s/%s to %d replicas", EvictionAutoScaler.Spec.TargetKind, target.Obj().GetNamespace(), target.Obj().GetName(), target.GetReplicas())) // Save ResourceVersion to EvictionAutoScaler status this will cause another reconcile. EvictionAutoScaler.Status.TargetGeneration = target.Obj().GetGeneration() EvictionAutoScaler.Status.LastEviction = EvictionAutoScaler.Spec.LastEviction //we could still keep a log here if thats useful ready(&EvictionAutoScaler.Status.Conditions, "Reconciled", "evictions hit cooldown so scaled down") return ctrl.Result{}, r.Status().Update(ctx, EvictionAutoScaler) } //could get here if a scale up/down was not needed because we never hit allowed diruptios == 0. EvictionAutoScaler.Status.LastEviction = EvictionAutoScaler.Spec.LastEviction //we could still keep a log here if thats useful ready(&EvictionAutoScaler.Status.Conditions, "Reconciled", "last eviction did not need scaling") return ctrl.Result{}, r.Status().Update(ctx, EvictionAutoScaler) //should we go rety in case there is also an eviction or just wait till the next eviction } func ready(conditions *[]metav1.Condition, reason string, message string) { meta.SetStatusCondition(conditions, metav1.Condition{ Type: "Ready", Status: metav1.ConditionTrue, Reason: reason, Message: message, LastTransitionTime: metav1.Now(), }) meta.RemoveStatusCondition(conditions, "Degraded") } func degraded(conditions *[]metav1.Condition, reason string, message string) { meta.SetStatusCondition(conditions, metav1.Condition{ Type: "Degraded", Status: metav1.ConditionTrue, Reason: reason, Message: message, LastTransitionTime: metav1.Now(), }) } func (r *EvictionAutoScalerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&myappsv1.EvictionAutoScaler{}). WithEventFilter(predicate.Funcs{ // ignore status updates as we make those. UpdateFunc: func(ue event.UpdateEvent) bool { return ue.ObjectOld.GetGeneration() != ue.ObjectNew.GetGeneration() }, }). Complete(r) } // TODO Unittest func calculateSurge(ctx context.Context, target Surger, minrepicas int32) int32 { surge := target.GetMaxSurge() if surge.Type == intstr.Int { return minrepicas + surge.IntVal } if surge.Type == intstr.String { percentageStr := strings.TrimSuffix(surge.StrVal, "%") percentage, err := strconv.Atoi(percentageStr) if err != nil { //return an error? so we can set degraded? log.FromContext(ctx).Error(err, "invalid surge") return minrepicas } return minrepicas + int32(math.Ceil((float64(minrepicas)*float64(percentage))/100.0)) } panic("must be string or int") }