func()

in eventmesh-operator/controllers/eventmesh_runtime/runtime_controller.go [111:244]


func (r *RuntimeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
	r.Logger.Info("eventMeshRuntime start reconciling",
		"Namespace", req.Namespace, "Namespace", req.Name)

	eventMeshRuntime := &eventmeshoperatorv1.Runtime{}
	err := r.Client.Get(context.TODO(), req.NamespacedName, eventMeshRuntime)
	if err != nil {
		// If it's a not found exception, it means the cr has been deleted.
		if errors.IsNotFound(err) {
			r.Logger.Info("eventMeshRuntime resource not found. Ignoring since object must be deleted.")
			return reconcile.Result{}, err
		}
		r.Logger.Error(err, "Failed to get eventMeshRuntime")
		return reconcile.Result{}, err
	}
	r.Logger.Info("get eventMeshRuntime object", "name", eventMeshRuntime.Name)

	if eventMeshRuntime.Status.Size == 0 {
		GroupNum = eventMeshRuntime.Spec.Size
	} else {
		GroupNum = eventMeshRuntime.Status.Size
	}

	replicaPerGroup := eventMeshRuntime.Spec.ReplicaPerGroup
	r.Logger.Info("GroupNum=" + strconv.Itoa(GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup))

	for groupIndex := 0; groupIndex < GroupNum; groupIndex++ {
		r.Logger.Info("Check eventMeshRuntime cluster " + strconv.Itoa(groupIndex+1) + "/" + strconv.Itoa(GroupNum))
		runtimeDep := r.getEventMeshRuntimeStatefulSet(eventMeshRuntime, groupIndex, 0)

		// Check if the statefulSet already exists, if not create a new one
		found := &appsv1.StatefulSet{}
		err = r.Client.Get(context.TODO(), types.NamespacedName{
			Name:      runtimeDep.Name,
			Namespace: runtimeDep.Namespace,
		}, found)
		if err != nil && errors.IsNotFound(err) {
			r.Logger.Info("Creating a new eventMeshRuntime StatefulSet.",
				"StatefulSet.Namespace", runtimeDep.Namespace,
				"StatefulSet.Name", runtimeDep.Name)
			err = r.Client.Create(context.TODO(), runtimeDep)
			if err != nil {
				r.Logger.Error(err, "Failed to create new StatefulSet",
					"StatefulSet.Namespace", runtimeDep.Namespace,
					"StatefulSet.Name", runtimeDep.Name)
			}
			time.Sleep(time.Duration(3) * time.Second)
		} else if err != nil {
			r.Logger.Error(err, "Failed to get eventMeshRuntime StatefulSet.")
		}
	}
	if eventMeshRuntime.Spec.AllowRestart {
		for groupIndex := 0; groupIndex < eventMeshRuntime.Spec.Size; groupIndex++ {
			runtimeName := eventMeshRuntime.Name + "-" + strconv.Itoa(groupIndex)
			r.Logger.Info("update eventMeshRuntime", "runtimeName", runtimeName)
			// update
			deployment := r.getEventMeshRuntimeStatefulSet(eventMeshRuntime, groupIndex, 0)
			found := &appsv1.StatefulSet{}
			err = r.Client.Get(context.TODO(), types.NamespacedName{
				Name:      deployment.Name,
				Namespace: deployment.Namespace,
			}, found)
			if err != nil {
				r.Logger.Error(err, "Failed to get eventMeshRuntime StatefulSet.")
			} else {
				err = r.Client.Update(context.TODO(), found)
				if err != nil {
					r.Logger.Error(err, "Failed to update eventMeshRuntime "+runtimeName,
						"StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
				} else {
					r.Logger.Info("Successfully update "+runtimeName,
						"StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
				}
				time.Sleep(time.Duration(1) * time.Second)
			}
		}
	}
	podList := &corev1.PodList{}
	labelSelector := labels.SelectorFromSet(getLabels())
	listOps := &client.ListOptions{
		Namespace:     eventMeshRuntime.Namespace,
		LabelSelector: labelSelector,
	}
	err = r.Client.List(context.TODO(), podList, listOps)
	if err != nil {
		r.Logger.Error(err, "Failed to list pods.",
			"eventMeshRuntime.Namespace", eventMeshRuntime.Namespace, "eventMeshRuntime.Name", eventMeshRuntime.Name)
		return reconcile.Result{}, err
	}

	podNames := getRuntimePodNames(podList.Items)
	r.Logger.Info(fmt.Sprintf("Status.Nodes = %s", eventMeshRuntime.Status.Nodes))
	r.Logger.Info(fmt.Sprintf("podNames = %s", podNames))
	// Ensure every pod is in running phase
	for _, pod := range podList.Items {
		if !reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
			r.Logger.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + ", wait for a moment...")
		}
	}

	if podNames != nil {
		eventMeshRuntime.Status.Nodes = podNames
		r.Logger.Info(fmt.Sprintf("eventMeshRuntime.Status.Nodes = %s", eventMeshRuntime.Status.Nodes))
		// Update status.Size if needed
		if eventMeshRuntime.Spec.Size != eventMeshRuntime.Status.Size {
			r.Logger.Info("eventMeshRuntime.Status.Size = " + strconv.Itoa(eventMeshRuntime.Status.Size))
			r.Logger.Info("eventMeshRuntime.Spec.Size = " + strconv.Itoa(eventMeshRuntime.Spec.Size))
			eventMeshRuntime.Status.Size = eventMeshRuntime.Spec.Size
			err = r.Client.Status().Update(context.TODO(), eventMeshRuntime)
			if err != nil {
				r.Logger.Error(err, "Failed to update eventMeshRuntime Size status.")
			}
		}

		// Update status.Nodes if needed
		if !reflect.DeepEqual(podNames, eventMeshRuntime.Status.Nodes) {
			err = r.Client.Status().Update(context.TODO(), eventMeshRuntime)
			if err != nil {
				r.Logger.Error(err, "Failed to update eventMeshRuntime Nodes status.")
			}
		}
	} else {
		r.Logger.Error(err, "Not found eventmesh runtime pods")
	}

	runningEventMeshRuntimeNum := getRunningRuntimeNum(podList.Items)
	if runningEventMeshRuntimeNum == eventMeshRuntime.Spec.Size {
		share.IsEventMeshRuntimeInitialized = true
	}

	r.Logger.Info("Successful reconciliation!")

	return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(share.RequeueAfterSecond) * time.Second}, nil
}