func()

in pkg/controller/controller/dledger_controller.go [113:230]


func (r *ReconcileController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
	reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
	reqLogger.Info("Reconciling Controller.")

	// Fetch the Controller instance
	controller := &rocketmqv1alpha1.Controller{}
	err := r.client.Get(context.TODO(), request.NamespacedName, controller)
	if err != nil {
		if errors.IsNotFound(err) {
			// Request object not found, could have been deleted after reconcile request.
			// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
			// Return and don't requeue
			reqLogger.Info("Controller resource not found. Ignoring since object must be deleted.")
			return reconcile.Result{}, nil
		}
		// Error reading the object - requeue the request.
		reqLogger.Error(err, "Failed to get Controller.")
		return reconcile.Result{RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, err
	}

	//create headless svc
	headlessSvc := &corev1.Service{}
	err = r.client.Get(context.TODO(), types.NamespacedName{Name: tool.BuildHeadlessSvcResourceName(request.Name), Namespace: request.Namespace}, headlessSvc)
	if err != nil {
		if errors.IsNotFound(err) {
			// create;
			consoleSvc := r.generateHeadlessSvc(controller)
			err = r.client.Create(context.TODO(), consoleSvc)
			if err != nil {
				reqLogger.Error(err, "Failed to create controller headless svc")
				return reconcile.Result{}, err
			} else {
				reqLogger.Info("Successfully create controller headless svc")
			}
		} else {
			return reconcile.Result{}, err
		}
	}

	sts := r.getControllerStatefulSet(controller)
	// Check if the statefulSet already exists, if not create a new one
	found := &appsv1.StatefulSet{}
	err = r.client.Get(context.TODO(), types.NamespacedName{Name: sts.Name, Namespace: sts.Namespace}, found)
	if err != nil && errors.IsNotFound(err) {
		reqLogger.Info("Creating a new Controller StatefulSet.", "StatefulSet.Namespace", sts.Namespace, "StatefulSet.Name", sts.Name)
		err = r.client.Create(context.TODO(), sts)
		if err != nil {
			reqLogger.Error(err, "Failed to create new Controller StatefulSet", "StatefulSet.Namespace", sts.Namespace, "StatefulSet.Name", sts.Name)
		}
	} else if err != nil {
		reqLogger.Error(err, "Failed to list Controller StatefulSet.")
	}

	// List the pods for this controller's statefulSet
	podList := &corev1.PodList{}
	labelSelector := labels.SelectorFromSet(labelsForController(controller.Name))
	listOps := &client.ListOptions{
		Namespace:     controller.Namespace,
		LabelSelector: labelSelector,
	}
	err = r.client.List(context.TODO(), podList, listOps)
	if err != nil {
		reqLogger.Error(err, "Failed to list pods.", "Controller.Namespace", controller.Namespace, "Controller.Name", controller.Name)
		return reconcile.Result{}, err
	}
	podNames := getPodNames(podList.Items)
	log.Info("controller.Status.Nodes length = " + strconv.Itoa(len(controller.Status.Nodes)))
	log.Info("podNames length = " + strconv.Itoa(len(podNames)))
	// Ensure every pod is in running phase
	for _, pod := range podList.Items {
		if !reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
			log.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + ", wait for a moment...")
		}
	}

	// Update status.Size if needed
	if controller.Spec.Size != controller.Status.Size {
		log.Info("controller.Status.Size = " + strconv.Itoa(controller.Status.Size))
		log.Info("controller.Spec.Size = " + strconv.Itoa(controller.Spec.Size))
		controller.Status.Size = controller.Spec.Size
		err = r.client.Status().Update(context.TODO(), controller)
		if err != nil {
			reqLogger.Error(err, "Failed to update Controller Size status.")
		}
	}

	// Update status.Nodes if needed
	if !reflect.DeepEqual(podNames, controller.Status.Nodes) {
		controller.Status.Nodes = podNames
		err = r.client.Status().Update(context.TODO(), controller)
		if err != nil {
			reqLogger.Error(err, "Failed to update Controller Nodes status.")
		}
	}

	//create svc
	controllerSvc := &corev1.Service{}
	controllerSvcName := tool.BuildSvcResourceName(request.Name)
	err = r.client.Get(context.TODO(), types.NamespacedName{Name: controllerSvcName, Namespace: request.Namespace}, controllerSvc)
	if err != nil {
		if errors.IsNotFound(err) {
			// create;
			svcToCreate := r.generateSvc(controller)
			err = r.client.Create(context.TODO(), svcToCreate)
			if err != nil {
				reqLogger.Error(err, "Failed to create controller svc")
				return reconcile.Result{}, err
			} else {
				reqLogger.Info("Successfully create controller svc")
			}
		} else {
			return reconcile.Result{}, err
		}
	}
	share.ControllerAccessPoint = controllerSvcName + ":9878"

	return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}