shardingsphere-operator/pkg/controllers/proxy_controller.go (168 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package controllers import ( "context" "time" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1" reconcile "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/proxy" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" logger "sigs.k8s.io/controller-runtime/pkg/log" ) const ( //WaitingForReady Time selection reference kubelet restart time WaitingForReady = 10 * time.Second proxyControllerName = "proxy_controller" ) // ProxyReconciler reconciles a ShardingSphereProxy object type ProxyReconciler struct { client.Client Scheme *runtime.Scheme Log logr.Logger } // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=shardingsphereproxies,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=shardingsphereproxies/status,verbs=get;update;patch // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=shardingsphereproxies/finalizers,verbs=update // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=services/status,verbs=get // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete;deletecollection // +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get // +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers/status,verbs=get // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *ProxyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := r.Log.WithValues(proxyControllerName, req.NamespacedName) rt, err := r.getRuntimeShardingSphereProxy(ctx, req.NamespacedName) if apierrors.IsNotFound(err) { logger.Info("Resource in work queue no longer exists!") return ctrl.Result{}, nil } else if err != nil { logger.Error(err, "Error getting CRD resource") return ctrl.Result{}, err } return r.reconcile(ctx, req, rt) } func (r *ProxyReconciler) getRuntimeShardingSphereProxy(ctx context.Context, namespacedName types.NamespacedName) (*v1alpha1.ShardingSphereProxy, error) { rt := &v1alpha1.ShardingSphereProxy{} err := r.Get(ctx, namespacedName, rt) return rt, err } func (r *ProxyReconciler) reconcile(ctx context.Context, req ctrl.Request, rt *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) { log := logger.FromContext(ctx) if res, err := r.reconcileDeployment(ctx, req.NamespacedName); err != nil { log.Error(err, "Error reconcile Deployment") return res, err } if res, err := r.reconcileService(ctx, req.NamespacedName); err != nil { log.Error(err, "Error reconcile Service") return res, err } if res, err := r.reconcilePodList(ctx, req.Namespace, req.Name); err != nil { log.Error(err, "Error reconcile Pod list") return res, err } if res, err := r.reconcileHPA(ctx, req.NamespacedName); err != nil { log.Error(err, "Error reconcile HPA") return res, err } return ctrl.Result{RequeueAfter: WaitingForReady}, nil } func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, namespacedName types.NamespacedName) (ctrl.Result, error) { proxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName) if err != nil { return ctrl.Result{}, err } deploy := &appsv1.Deployment{} err = r.Get(ctx, namespacedName, deploy) if apierrors.IsNotFound(err) { exp := reconcile.NewDeployment(proxy) if err := r.Create(ctx, exp); err != nil { return ctrl.Result{}, err } } if err != nil { return ctrl.Result{}, err } act := deploy.DeepCopy() exp := reconcile.UpdateDeployment(proxy, act) if err := r.Update(ctx, exp); err != nil { return ctrl.Result{Requeue: true}, err } return ctrl.Result{}, nil } func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName types.NamespacedName) (ctrl.Result, error) { proxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName) if err != nil { return ctrl.Result{}, err } // Get the HPA hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{} err = r.Get(ctx, namespacedName, hpa) // If the HPA doesn't exist, create it if apierrors.IsNotFound(err) { if proxy.Spec.AutomaticScaling != nil && proxy.Spec.AutomaticScaling.Enable { exp := reconcile.NewHPA(proxy) if err := r.Create(ctx, exp); err != nil { return ctrl.Result{}, err } } return ctrl.Result{}, nil } if err != nil { return ctrl.Result{}, err } // If the HPA exists, but we don't want it, delete it if proxy.Spec.AutomaticScaling == nil || !proxy.Spec.AutomaticScaling.Enable { if err := r.Delete(ctx, hpa); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } // If the HPA exists and we want it, update it act := hpa.DeepCopy() exp := reconcile.UpdateHPA(proxy, act) if err := r.Update(ctx, exp); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName types.NamespacedName) (ctrl.Result, error) { ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName) if err != nil { return ctrl.Result{}, err } service := &v1.Service{} err = r.Get(ctx, namespacedName, service) if apierrors.IsNotFound(err) { exp := reconcile.NewService(ssproxy) if err := r.Create(ctx, exp); err != nil { return ctrl.Result{}, err } } if err != nil { return ctrl.Result{}, err } act := service.DeepCopy() exp := reconcile.UpdateService(ssproxy, act) if err := r.Update(ctx, exp); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace, name string) (ctrl.Result, error) { podList := &v1.PodList{} if err := r.List(ctx, podList, client.InNamespace(namespace), client.MatchingLabels(map[string]string{"apps": name})); err != nil { return ctrl.Result{}, err } result := ctrl.Result{} rt, err := r.getRuntimeShardingSphereProxy(ctx, types.NamespacedName{ Namespace: namespace, Name: name, }) if err != nil { return ctrl.Result{}, err } rt.Status = reconcile.ReconcileStatus(podList, rt) if err := r.Status().Update(ctx, rt); err != nil { return result, err } return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *ProxyReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.ShardingSphereProxy{}). Owns(&appsv1.Deployment{}). Owns(&v1.Service{}). Owns(&v1.Pod{}). Complete(r) }