in eventmesh-operator/controllers/eventmesh_runtime/runtime_controller.go [111:244]
func (r *RuntimeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
r.Logger.Info("eventMeshRuntime start reconciling",
"Namespace", req.Namespace, "Namespace", req.Name)
eventMeshRuntime := &eventmeshoperatorv1.Runtime{}
err := r.Client.Get(context.TODO(), req.NamespacedName, eventMeshRuntime)
if err != nil {
// If it's a not found exception, it means the cr has been deleted.
if errors.IsNotFound(err) {
r.Logger.Info("eventMeshRuntime resource not found. Ignoring since object must be deleted.")
return reconcile.Result{}, err
}
r.Logger.Error(err, "Failed to get eventMeshRuntime")
return reconcile.Result{}, err
}
r.Logger.Info("get eventMeshRuntime object", "name", eventMeshRuntime.Name)
if eventMeshRuntime.Status.Size == 0 {
GroupNum = eventMeshRuntime.Spec.Size
} else {
GroupNum = eventMeshRuntime.Status.Size
}
replicaPerGroup := eventMeshRuntime.Spec.ReplicaPerGroup
r.Logger.Info("GroupNum=" + strconv.Itoa(GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup))
for groupIndex := 0; groupIndex < GroupNum; groupIndex++ {
r.Logger.Info("Check eventMeshRuntime cluster " + strconv.Itoa(groupIndex+1) + "/" + strconv.Itoa(GroupNum))
runtimeDep := r.getEventMeshRuntimeStatefulSet(eventMeshRuntime, groupIndex, 0)
// Check if the statefulSet already exists, if not create a new one
found := &appsv1.StatefulSet{}
err = r.Client.Get(context.TODO(), types.NamespacedName{
Name: runtimeDep.Name,
Namespace: runtimeDep.Namespace,
}, found)
if err != nil && errors.IsNotFound(err) {
r.Logger.Info("Creating a new eventMeshRuntime StatefulSet.",
"StatefulSet.Namespace", runtimeDep.Namespace,
"StatefulSet.Name", runtimeDep.Name)
err = r.Client.Create(context.TODO(), runtimeDep)
if err != nil {
r.Logger.Error(err, "Failed to create new StatefulSet",
"StatefulSet.Namespace", runtimeDep.Namespace,
"StatefulSet.Name", runtimeDep.Name)
}
time.Sleep(time.Duration(3) * time.Second)
} else if err != nil {
r.Logger.Error(err, "Failed to get eventMeshRuntime StatefulSet.")
}
}
if eventMeshRuntime.Spec.AllowRestart {
for groupIndex := 0; groupIndex < eventMeshRuntime.Spec.Size; groupIndex++ {
runtimeName := eventMeshRuntime.Name + "-" + strconv.Itoa(groupIndex)
r.Logger.Info("update eventMeshRuntime", "runtimeName", runtimeName)
// update
deployment := r.getEventMeshRuntimeStatefulSet(eventMeshRuntime, groupIndex, 0)
found := &appsv1.StatefulSet{}
err = r.Client.Get(context.TODO(), types.NamespacedName{
Name: deployment.Name,
Namespace: deployment.Namespace,
}, found)
if err != nil {
r.Logger.Error(err, "Failed to get eventMeshRuntime StatefulSet.")
} else {
err = r.Client.Update(context.TODO(), found)
if err != nil {
r.Logger.Error(err, "Failed to update eventMeshRuntime "+runtimeName,
"StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
} else {
r.Logger.Info("Successfully update "+runtimeName,
"StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
}
time.Sleep(time.Duration(1) * time.Second)
}
}
}
podList := &corev1.PodList{}
labelSelector := labels.SelectorFromSet(getLabels())
listOps := &client.ListOptions{
Namespace: eventMeshRuntime.Namespace,
LabelSelector: labelSelector,
}
err = r.Client.List(context.TODO(), podList, listOps)
if err != nil {
r.Logger.Error(err, "Failed to list pods.",
"eventMeshRuntime.Namespace", eventMeshRuntime.Namespace, "eventMeshRuntime.Name", eventMeshRuntime.Name)
return reconcile.Result{}, err
}
podNames := getRuntimePodNames(podList.Items)
r.Logger.Info(fmt.Sprintf("Status.Nodes = %s", eventMeshRuntime.Status.Nodes))
r.Logger.Info(fmt.Sprintf("podNames = %s", podNames))
// Ensure every pod is in running phase
for _, pod := range podList.Items {
if !reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
r.Logger.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + ", wait for a moment...")
}
}
if podNames != nil {
eventMeshRuntime.Status.Nodes = podNames
r.Logger.Info(fmt.Sprintf("eventMeshRuntime.Status.Nodes = %s", eventMeshRuntime.Status.Nodes))
// Update status.Size if needed
if eventMeshRuntime.Spec.Size != eventMeshRuntime.Status.Size {
r.Logger.Info("eventMeshRuntime.Status.Size = " + strconv.Itoa(eventMeshRuntime.Status.Size))
r.Logger.Info("eventMeshRuntime.Spec.Size = " + strconv.Itoa(eventMeshRuntime.Spec.Size))
eventMeshRuntime.Status.Size = eventMeshRuntime.Spec.Size
err = r.Client.Status().Update(context.TODO(), eventMeshRuntime)
if err != nil {
r.Logger.Error(err, "Failed to update eventMeshRuntime Size status.")
}
}
// Update status.Nodes if needed
if !reflect.DeepEqual(podNames, eventMeshRuntime.Status.Nodes) {
err = r.Client.Status().Update(context.TODO(), eventMeshRuntime)
if err != nil {
r.Logger.Error(err, "Failed to update eventMeshRuntime Nodes status.")
}
}
} else {
r.Logger.Error(err, "Not found eventmesh runtime pods")
}
runningEventMeshRuntimeNum := getRunningRuntimeNum(podList.Items)
if runningEventMeshRuntimeNum == eventMeshRuntime.Spec.Size {
share.IsEventMeshRuntimeInitialized = true
}
r.Logger.Info("Successful reconciliation!")
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(share.RequeueAfterSecond) * time.Second}, nil
}