controllers/dsapi_controller.go (174 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package controllers import ( "context" dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "time" ) var ( apiLogger = ctrl.Log.WithName("DSApi-controller") ) // DSApiReconciler reconciles a DSApi object type DSApiReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder } //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/status,verbs=get;update;patch //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/finalizers,verbs=update //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // the DSApi object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile func (r *DSApiReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { apiLogger.Info("dmApi start reconcile logic") defer apiLogger.Info("dmApi Reconcile end ---------------------------------------------") cluster := &dsv1alpha1.DSApi{} if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil { if errors.IsNotFound(err) { r.Recorder.Event(cluster, corev1.EventTypeWarning, "dsApi is not Found", "dsApi is not Found") return ctrl.Result{}, nil } return ctrl.Result{}, err } desired := cluster.DeepCopy() // Handler finalizer // examine DeletionTimestamp to determine if object is under deletion if cluster.ObjectMeta.DeletionTimestamp.IsZero() { // The object is not being deleted, so if it does not have our finalizer, // then lets add the finalizer and update the object. This is equivalent // registering our finalizer. if !controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) { controllerutil.AddFinalizer(desired, dsv1alpha1.FinalizerName) if err := r.Update(ctx, desired); err != nil { return ctrl.Result{}, err } } } else { // The object is being deleted if controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) { // our finalizer is present, so lets handle any external dependency if err := r.ensureDSApiDeleted(ctx, cluster); err != nil { return ctrl.Result{}, err } // remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(desired, dsv1alpha1.FinalizerName) if err := r.Update(ctx, desired); err != nil { return ctrl.Result{}, err } } // Stop reconciliation as the item is being deleted return ctrl.Result{}, nil } if cluster.Spec.Paused { apiLogger.Info("ds-Api control has been paused: ", "ds-Api-name", cluster.Name) desired.Status.ControlPaused = true if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { return ctrl.Result{}, err } r.Recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing") return ctrl.Result{}, nil } // 1. First time we see the ds-master-cluster, initialize it if cluster.Status.Phase == dsv1alpha1.DsPhaseNone { desired.Status.Phase = dsv1alpha1.DsPhaseCreating apiLogger.Info("phase had been changed from none ---> creating") err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)) return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err } //2 ensure the Api service apiLogger.Info("Ensuring Api service") if err := r.ensureApiService(ctx, cluster); err != nil { return ctrl.Result{Requeue: false}, nil } if requeue, err := r.ensureApiDeployment(ctx, cluster); err != nil { return ctrl.Result{Requeue: false}, err } else { if requeue { return ctrl.Result{RequeueAfter: 3 * time.Second}, nil } } apiLogger.Info("******************************************************") desired.Status.Phase = dsv1alpha1.DsPhaseNone if err := r.Update(ctx, desired); err != nil { return ctrl.Result{}, err } return ctrl.Result{Requeue: false}, nil } // SetupWithManager sets up the controller with the Manager. func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error { r.Recorder = mgr.GetEventRecorderFor("api-controller") return ctrl.NewControllerManagedBy(mgr). For(&dsv1alpha1.DSApi{}). Owns(&v1.Deployment{}). Owns(&corev1.Service{}). Complete(r) } func (r *DSApiReconciler) ensureDSApiDeleted(ctx context.Context, DSApi *dsv1alpha1.DSApi) error { if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil { return err } return nil } func (r *DSApiReconciler) ensureApiService(ctx context.Context, cluster *dsv1alpha1.DSApi) error { // 1. Client service service := &corev1.Service{} namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsApiServiceValue} if err := r.Client.Get(ctx, namespacedName, service); err != nil { // Local cache not found apiLogger.Info("api get service error") if apierrors.IsNotFound(err) { service = createApiService(cluster) if err := controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil { apiLogger.Info("create Api service error") return err } // Remote may already exist, so we will return err, for the next time, this code will not execute if err := r.Client.Create(ctx, service); err != nil { apiLogger.Info("create Api service error1") return err } apiLogger.Info("the Api service had been created") } } return nil } func (r *DSApiReconciler) ensureApiDeployment(ctx context.Context, cluster *dsv1alpha1.DSApi) (bool, error) { deployment := &v1.Deployment{} deploymentNamespaceName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsApiDeploymentValue} if err := r.Client.Get(ctx, deploymentNamespaceName, deployment); err != nil { if apierrors.IsNotFound(err) { deployment = createApiDeployment(cluster) applyDeploymentPolicy(deployment, cluster.Spec.Deployment) } if err := controllerutil.SetControllerReference(cluster, deployment, r.Scheme); err != nil { return false, err } if err := r.Client.Create(ctx, deployment); err == nil { apiLogger.Info("the api deployment had been created") return false, nil } else { return false, err } } else { if r.predicateUpdate(deployment, cluster) { apiLogger.Info("api need to update") err := r.updateApiDeployment(ctx, deployment, cluster) if err != nil { return false, err } return true, nil } else { apiLogger.Info("begin to check deployment ") if IsDeploymentAvailable(deployment) { apiLogger.Info("api deployment is available ") return false, nil } else { return true, nil } } } } //only notice the property of replicas and image and version func (r *DSApiReconciler) updateApiDeployment(ctx context.Context, deployment *v1.Deployment, cluster *dsv1alpha1.DSApi) error { deployment.Spec.Replicas = int32Ptr(cluster.Spec.Replicas) deployment.Spec.Template.Spec.Containers[0].Image = ImageName(cluster.Spec.Repository, cluster.Spec.Version) if err := r.Client.Update(ctx, deployment); err != nil { return err } return nil } func (r *DSApiReconciler) predicateUpdate(deployment *v1.Deployment, cluster *dsv1alpha1.DSApi) bool { if *deployment.Spec.Replicas == (cluster.Spec.Replicas) && deployment.Spec.Template.Spec.Containers[0].Image == ImageName(cluster.Spec.Repository, cluster.Spec.Version) { apiLogger.Info("no need update") return false } return true }