controllers/dsworker_controller.go (212 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package controllers import ( "context" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" ) // DSWorkerReconciler reconciles a DSWorker object type DSWorkerReconciler struct { client.Client Scheme *runtime.Scheme recorder record.EventRecorder } var ( workerLogger = ctrl.Log.WithName("DSWorker-controller") ) //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/status,verbs=get;update;patch //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/finalizers,verbs=update //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;create;delete;list;watch //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // the DSWorker object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { workerLogger.Info("dmWorker start reconcile logic") defer workerLogger.Info("dmWorker Reconcile end ---------------------------------------------") cluster := &dsv1alpha1.DSWorker{} if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil { if errors.IsNotFound(err) { r.recorder.Event(cluster, corev1.EventTypeWarning, "dmWorker is not Found", "dmWorker is not Found") return ctrl.Result{}, nil } return ctrl.Result{}, err } desired := cluster.DeepCopy() // Handler finalizer // examine DeletionTimestamp to determine if object is under deletion if cluster.ObjectMeta.DeletionTimestamp.IsZero() { // The object is not being deleted, so if it does not have our finalizer, // then lets add the finalizer and update the object. This is equivalent // registering our finalizer. if !controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) { controllerutil.AddFinalizer(desired, dsv1alpha1.FinalizerName) if err := r.Update(ctx, desired); err != nil { return ctrl.Result{}, err } } } else { // The object is being deleted if controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) { // our finalizer is present, so lets handle any external dependency if err := r.ensureDSWorkerDeleted(ctx, cluster); err != nil { return ctrl.Result{}, err } // remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(desired, dsv1alpha1.FinalizerName) if err := r.Update(ctx, desired); err != nil { return ctrl.Result{}, err } } return ctrl.Result{}, nil } // If dsworker-cluster is paused, we do nothing on things changed. // Until dsworker-cluster is un-paused, we will reconcile to the dsworker state of that point. if cluster.Spec.Paused { workerLogger.Info("ds-worker control has been paused: ", "ds-worker-name", cluster.Name) desired.Status.ControlPaused = true if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { if apierrors.IsConflict(err) { return ctrl.Result{Requeue: true}, nil } else { masterLogger.Error(err, "unexpected error when worker update status in paused") return ctrl.Result{}, err } } r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing") return ctrl.Result{}, nil } // 1. First time we see the ds-worker-cluster, initialize it if cluster.Status.Phase == dsv1alpha1.DsPhaseNone { if desired.Status.Selector == "" { selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: LabelForWorkerPod()}) if err != nil { masterLogger.Error(err, "Error retrieving selector labels") return reconcile.Result{}, err } desired.Status.Selector = selector.String() } desired.Status.Phase = dsv1alpha1.DsPhaseCreating workerLogger.Info("phase had been changed from none ---> creating") if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { if apierrors.IsConflict(err) { return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err } else { masterLogger.Error(err, "unexpected error when worker update status in creating") return ctrl.Result{}, err } } } // 3. Ensure bootstrapped, we will block here util cluster is up and healthy workerLogger.Info("Ensuring cluster members") if requeue, err := r.ensureMembers(ctx, cluster); requeue { return ctrl.Result{RequeueAfter: 5 * time.Second}, err } // 4. Ensure cluster scaled workerLogger.Info("Ensuring cluster scaled") if requeue, err := r.ensureScaled(ctx, cluster); requeue { return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, err } // .5 Ensure cluster upgraded workerLogger.Info("Ensuring cluster upgraded") if requeue, err := r.ensureUpgraded(ctx, cluster); requeue { return ctrl.Result{Requeue: true}, err } desired.Status.Phase = dsv1alpha1.DsPhaseFinished if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { if apierrors.IsConflict(err) { return ctrl.Result{Requeue: true}, nil } else { masterLogger.Error(err, "unexpected error when worker update status in finished") return ctrl.Result{}, err } } workerLogger.Info("******************************************************") desired.Status.Phase = dsv1alpha1.DsPhaseNone return ctrl.Result{Requeue: false}, nil } // SetupWithManager sets up the controller with the Manager. func (r *DSWorkerReconciler) SetupWithManager(mgr ctrl.Manager) error { r.recorder = mgr.GetEventRecorderFor("worker-controller") return ctrl.NewControllerManagedBy(mgr). For(&dsv1alpha1.DSWorker{}). Owns(&corev1.Pod{}). Complete(r) } func (r *DSWorkerReconciler) ensureMembers(ctx context.Context, cluster *dsv1alpha1.DSWorker) (bool, error) { pms, err := r.podMemberSet(ctx, cluster) if err != nil { return true, err } if len(pms) > 0 { return !allMembersHealth(pms), nil } else { return false, nil } } func (r *DSWorkerReconciler) ensureScaled(ctx context.Context, cluster *dsv1alpha1.DSWorker) (bool, error) { // Get current members in this cluster ms, err := r.podMemberSet(ctx, cluster) if err != nil { return true, err } workerLogger.Info("before scale", "podMemberSet", len(ms), "replicas", cluster.Spec.Replicas) // Scale up if len(ms) < cluster.Spec.Replicas { err = r.createMember(ctx, cluster) if err != nil { r.recorder.Event(cluster, corev1.EventTypeWarning, "cannot create the new ds-dsworker pod", "the ds-dsworker pod had been created failed") return true, err } return true, err } // Scale down if len(ms) > cluster.Spec.Replicas { pod := &corev1.Pod{} member := ms.PickOne() pod.SetName(member.Name) pod.SetNamespace(member.Namespace) err = r.deleteMember(ctx, pod, cluster) if err != nil { return true, err } return true, err } return false, nil } func (r *DSWorkerReconciler) ensureUpgraded(ctx context.Context, cluster *dsv1alpha1.DSWorker) (bool, error) { ms, err := r.podMemberSet(ctx, cluster) if err != nil { return false, err } for _, memset := range ms { if memset.Version != cluster.Spec.Version { pod := &corev1.Pod{} pod.SetName(memset.Name) pod.SetNamespace(memset.Namespace) if err := r.deleteMember(ctx, pod, cluster); err != nil { return false, err } return true, nil } } return false, nil } func (r *DSWorkerReconciler) ensureDSWorkerDeleted(ctx context.Context, cluster *dsv1alpha1.DSWorker) error { if err := r.Client.Delete(ctx, cluster, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { return err } return nil } func (r *DSWorkerReconciler) createMember(ctx context.Context, cluster *dsv1alpha1.DSWorker) error { workerLogger.Info("Starting add new member to cluster", "cluster", cluster.Name) defer workerLogger.Info("End add new member to cluster", "cluster", cluster.Name) // New Pod pod, err := r.newDSWorkerPod(ctx, cluster) if err != nil { return err } // Create pod if err = r.Client.Create(ctx, pod); err != nil && !apierrors.IsAlreadyExists(err) { return err } desired := cluster.DeepCopy() desired.Spec.Replicas += 1 if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { return err } return nil } func (r *DSWorkerReconciler) deleteMember(ctx context.Context, pod *corev1.Pod, cluster *dsv1alpha1.DSWorker) error { workerLogger.Info("begin delete pod", "pod name", pod.Name) if err := r.Client.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) { return err } desired := cluster.DeepCopy() desired.Spec.Replicas -= 1 if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { return err } return nil }