func()

in controllers/dsworker_controller.go [65:178]


func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	workerLogger.Info("dmWorker start reconcile logic")
	defer workerLogger.Info("dmWorker Reconcile end ---------------------------------------------")

	cluster := &dsv1alpha1.DSWorker{}

	if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
		if errors.IsNotFound(err) {
			r.recorder.Event(cluster, corev1.EventTypeWarning, "dmWorker is not Found", "dmWorker is not Found")
			return ctrl.Result{}, nil
		}
		return ctrl.Result{}, err
	}
	desired := cluster.DeepCopy()

	// Handler finalizer
	// examine DeletionTimestamp to determine if object is under deletion
	if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
		// The object is not being deleted, so if it does not have our finalizer,
		// then lets add the finalizer and update the object. This is equivalent
		// registering our finalizer.
		if !controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
			controllerutil.AddFinalizer(desired, dsv1alpha1.FinalizerName)
			if err := r.Update(ctx, desired); err != nil {
				return ctrl.Result{}, err
			}
		}
	} else {
		// The object is being deleted
		if controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
			// our finalizer is present, so lets handle any external dependency
			if err := r.ensureDSWorkerDeleted(ctx, cluster); err != nil {
				return ctrl.Result{}, err
			}
			// remove our finalizer from the list and update it.
			controllerutil.RemoveFinalizer(desired, dsv1alpha1.FinalizerName)
			if err := r.Update(ctx, desired); err != nil {
				return ctrl.Result{}, err
			}
		}
		return ctrl.Result{}, nil
	}

	// If dsworker-cluster is paused, we do nothing on things changed.
	// Until dsworker-cluster is un-paused, we will reconcile to the dsworker state of that point.
	if cluster.Spec.Paused {
		workerLogger.Info("ds-worker control has been paused: ", "ds-worker-name", cluster.Name)
		desired.Status.ControlPaused = true
		if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
			if apierrors.IsConflict(err) {
				return ctrl.Result{Requeue: true}, nil
			} else {
				masterLogger.Error(err, "unexpected error when worker update status in paused")
				return ctrl.Result{}, err
			}
		}
		r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing")
		return ctrl.Result{}, nil
	}

	// 1. First time we see the ds-worker-cluster, initialize it
	if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
		if desired.Status.Selector == "" {
			selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: LabelForWorkerPod()})
			if err != nil {
				masterLogger.Error(err, "Error retrieving selector labels")
				return reconcile.Result{}, err
			}
			desired.Status.Selector = selector.String()
		}
		desired.Status.Phase = dsv1alpha1.DsPhaseCreating
		workerLogger.Info("phase had been changed from  none ---> creating")
		if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
			if apierrors.IsConflict(err) {
				return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
			} else {
				masterLogger.Error(err, "unexpected error when worker update status in creating")
				return ctrl.Result{}, err
			}
		}
	}

	// 3. Ensure bootstrapped, we will block here util cluster is up and healthy
	workerLogger.Info("Ensuring cluster members")
	if requeue, err := r.ensureMembers(ctx, cluster); requeue {
		return ctrl.Result{RequeueAfter: 5 * time.Second}, err
	}

	// 4. Ensure cluster scaled
	workerLogger.Info("Ensuring cluster scaled")
	if requeue, err := r.ensureScaled(ctx, cluster); requeue {
		return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, err
	}

	// .5 Ensure cluster upgraded
	workerLogger.Info("Ensuring cluster upgraded")
	if requeue, err := r.ensureUpgraded(ctx, cluster); requeue {
		return ctrl.Result{Requeue: true}, err
	}

	desired.Status.Phase = dsv1alpha1.DsPhaseFinished
	if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
		if apierrors.IsConflict(err) {
			return ctrl.Result{Requeue: true}, nil
		} else {
			masterLogger.Error(err, "unexpected error when worker update status in finished")
			return ctrl.Result{}, err
		}
	}

	workerLogger.Info("******************************************************")
	desired.Status.Phase = dsv1alpha1.DsPhaseNone
	return ctrl.Result{Requeue: false}, nil
}