shardingsphere-operator/pkg/controllers/auto_scaler_controller.go (128 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package controllers import ( "context" "reflect" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes" reconcile "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/autoscaler" "github.com/go-logr/logr" autoscalingv2beta2 "k8s.io/api/autoscaling/v2" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" autoscalingv1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) const ( autoScalerControllerName = "autoscaler-controller" ) // AutoScalerReconciler is a controller for the shardingsphere cluster type AutoScalerReconciler struct { client.Client Scheme *runtime.Scheme Log logr.Logger Builder reconcile.Builder Resources kubernetes.Resources } // SetupWithManager sets up the controller with the Manager func (r *AutoScalerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.AutoScaler{}). Owns(&autoscalingv2beta2.HorizontalPodAutoscaler{}). Complete(r) } // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=computenodes,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=autoscaling/v2,resources=horizontalpodautoscaler,verbs=get;list;watch;create;update;patch;delete // Reconcile handles main function of this controller func (r *AutoScalerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := r.Log.WithValues(autoScalerControllerName, req.NamespacedName) as := &v1alpha1.AutoScaler{} if err := r.Get(ctx, req.NamespacedName, as); err != nil { if apierrors.IsNotFound(err) { return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil } logger.Error(err, "Failed to get the autoscaler") return ctrl.Result{Requeue: true}, err } if err := r.reconcileAutoScaler(ctx, as); err != nil { logger.Error(err, "Failed to reconcile autoscaler") } return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil } func (r *AutoScalerReconciler) reconcileAutoScaler(ctx context.Context, as *v1alpha1.AutoScaler) error { gvk := as.GroupVersionKind() for i := range as.Spec.PolicyGroup { pg := as.Spec.PolicyGroup[i] if pg.Provider == "KubernetesHPA" && pg.Horizontal != nil { if err := r.reconcileHPA(ctx, &as.ObjectMeta, gvk, &pg); err != nil { return err } } if pg.Provider == "KubernetesVPA" && pg.Vertical != nil { if err := r.reconcileVPA(ctx, &as.ObjectMeta, gvk, &pg); err != nil { return err } } } return nil } func (r *AutoScalerReconciler) reconcileHPA(ctx context.Context, meta *metav1.ObjectMeta, gvk schema.GroupVersionKind, policy *v1alpha1.ScalingPolicy) error { hpa, err := r.getHPAByNamespacedName(ctx, types.NamespacedName{Namespace: meta.Namespace, Name: meta.Name}) if err != nil { return err } if hpa != nil { return r.updateHPA(ctx, meta, gvk, policy, hpa) } return r.createHPA(ctx, meta, gvk, policy) } func (r *AutoScalerReconciler) getHPAByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) { return r.Resources.HPA().GetByNamespacedName(ctx, namespacedName) } // nolint:dupl func (r *AutoScalerReconciler) updateHPA(ctx context.Context, meta *metav1.ObjectMeta, gvk schema.GroupVersionKind, policy *v1alpha1.ScalingPolicy, hpa *autoscalingv2beta2.HorizontalPodAutoscaler) error { exp := r.Builder.BuildHorizontalPodAutoScaler(ctx, meta, gvk, policy) exp.ObjectMeta = hpa.ObjectMeta exp.Labels = hpa.Labels exp.Annotations = hpa.Annotations if !reflect.DeepEqual(hpa.Spec, exp.Spec) { return r.Resources.HPA().Update(ctx, hpa) } return nil } func (r *AutoScalerReconciler) createHPA(ctx context.Context, meta *metav1.ObjectMeta, gvk schema.GroupVersionKind, policy *v1alpha1.ScalingPolicy) error { hpa := r.Builder.BuildHorizontalPodAutoScaler(ctx, meta, gvk, policy) err := r.Resources.HPA().Create(ctx, hpa) if err != nil && apierrors.IsAlreadyExists(err) || err == nil { return nil } return err } func (r *AutoScalerReconciler) reconcileVPA(ctx context.Context, meta *metav1.ObjectMeta, gvk schema.GroupVersionKind, policy *v1alpha1.ScalingPolicy) error { vpa, err := r.getVPAByNamespacedName(ctx, types.NamespacedName{Namespace: meta.Namespace, Name: meta.Name}) if err != nil { return err } if vpa != nil { return r.updateVPA(ctx, meta, gvk, policy, vpa) } return r.createVPA(ctx, meta, gvk, policy) } func (r *AutoScalerReconciler) getVPAByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*autoscalingv1.VerticalPodAutoscaler, error) { return r.Resources.VPA().GetByNamespacedName(ctx, namespacedName) } // nolint:dupl func (r *AutoScalerReconciler) updateVPA(ctx context.Context, meta *metav1.ObjectMeta, gvk schema.GroupVersionKind, policy *v1alpha1.ScalingPolicy, vpa *autoscalingv1.VerticalPodAutoscaler) error { exp := r.Builder.BuildVerticalPodAutoscaler(ctx, meta, gvk, policy) exp.ObjectMeta = vpa.ObjectMeta exp.Labels = vpa.Labels exp.Annotations = vpa.Annotations if !reflect.DeepEqual(vpa.Spec, exp.Spec) { return r.Resources.VPA().Update(ctx, vpa) } return nil } // nolint:dupl func (r *AutoScalerReconciler) createVPA(ctx context.Context, meta *metav1.ObjectMeta, gvk schema.GroupVersionKind, policy *v1alpha1.ScalingPolicy) error { vpa := r.Builder.BuildVerticalPodAutoscaler(ctx, meta, gvk, policy) err := r.Resources.VPA().Create(ctx, vpa) if err != nil && apierrors.IsAlreadyExists(err) || err == nil { return nil } return err }