func()

in pkg/controller/broker/broker_controller.go [119:348]


func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
	reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
	reqLogger.Info("Reconciling Broker.")

	// Fetch the Broker instance
	broker := &rocketmqv1alpha1.Broker{}
	err := r.client.Get(context.TODO(), request.NamespacedName, broker)
	if err != nil {
		if errors.IsNotFound(err) {
			// Request object not found, could have been deleted after reconcile request.
			// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
			// Return and don't requeue
			reqLogger.Info("Broker resource not found. Ignoring since object must be deleted.")
			return reconcile.Result{}, nil
		}
		// Error reading the object - requeue the request.
		reqLogger.Error(err, "Failed to get Broker.")
		return reconcile.Result{}, err
	}

	if broker.Status.Size == 0 {
		share.GroupNum = broker.Spec.Size
	} else {
		share.GroupNum = broker.Status.Size
	}

	if broker.Spec.NameServers == "" {
		// wait for name server ready when create broker cluster if nameServers is omitted
		for {
			if share.IsNameServersStrInitialized {
				break
			} else {
				log.Info("Broker Waiting for name server ready...")
				time.Sleep(time.Duration(cons.WaitForNameServerReadyInSecond) * time.Second)
			}
		}
	} else {
		share.NameServersStr = broker.Spec.NameServers
	}

	if broker.Spec.ClusterMode == "" {
		broker.Spec.ClusterMode = "STATIC"
	}

	if broker.Spec.ClusterMode == "CONTROLLER" && share.ControllerAccessPoint == "" {
		log.Info("Broker Waiting for Controller ready...")
		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
	}

	share.BrokerClusterName = broker.Name
	replicaPerGroup := broker.Spec.ReplicaPerGroup
	reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup))
	for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ {
		reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum))
		dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0)
		// Check if the statefulSet already exists, if not create a new one
		found := &appsv1.StatefulSet{}
		err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
		if err != nil && errors.IsNotFound(err) {
			reqLogger.Info("Creating a new Master Broker StatefulSet.", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
			err = r.client.Create(context.TODO(), dep)
			if err != nil {
				reqLogger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
			}
		} else if err != nil {
			reqLogger.Error(err, "Failed to get broker master StatefulSet.")
		}

		for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
			reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
			replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
			err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found)
			if err != nil && errors.IsNotFound(err) {
				reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
				err = r.client.Create(context.TODO(), replicaDep)
				if err != nil {
					reqLogger.Error(err, "Failed to create new StatefulSet of broker-"+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
				}
			} else if err != nil {
				reqLogger.Error(err, "Failed to get broker replica StatefulSet.")
			}
		}
	}

	// Check for name server scaling
	if broker.Spec.AllowRestart {
		// The following code will restart all brokers to update NAMESRV_ADDR env
		if share.IsNameServersStrUpdated {
			for brokerGroupIndex := 0; brokerGroupIndex < broker.Spec.Size; brokerGroupIndex++ {
				brokerName := getBrokerName(broker, brokerGroupIndex)
				// Update master broker
				reqLogger.Info("Update Master Broker NAMESRV_ADDR of " + brokerName)
				dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0)
				found := &appsv1.StatefulSet{}
				err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
				if err != nil {
					reqLogger.Error(err, "Failed to get broker master StatefulSet of "+brokerName)
				} else {
					found.Spec.Template.Spec.Containers[0].Env[0].Value = share.NameServersStr
					err = r.client.Update(context.TODO(), found)
					if err != nil {
						reqLogger.Error(err, "Failed to update NAMESRV_ADDR of master broker "+brokerName, "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
					} else {
						reqLogger.Info("Successfully updated NAMESRV_ADDR of master broker "+brokerName, "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
					}
					time.Sleep(time.Duration(cons.RestartBrokerPodIntervalInSecond) * time.Second)
				}
				// Update replicas brokers
				for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
					reqLogger.Info("Update Replica Broker NAMESRV_ADDR of " + brokerName + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
					replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
					replicaFound := &appsv1.StatefulSet{}
					err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, replicaFound)
					if err != nil {
						reqLogger.Error(err, "Failed to get broker replica StatefulSet of "+brokerName)
					} else {
						for index := range replicaFound.Spec.Template.Spec.Containers[0].Env {
							if cons.EnvNameServiceAddress == replicaFound.Spec.Template.Spec.Containers[0].Env[index].Name {
								replicaFound.Spec.Template.Spec.Containers[0].Env[index].Value = share.NameServersStr
								break
							}
						}
						err = r.client.Update(context.TODO(), replicaFound)
						if err != nil {
							reqLogger.Error(err, "Failed to update NAMESRV_ADDR of "+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaFound.Namespace, "StatefulSet.Name", replicaFound.Name)
						} else {
							reqLogger.Info("Successfully updated NAMESRV_ADDR of "+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaFound.Namespace, "StatefulSet.Name", replicaFound.Name)
						}
						time.Sleep(time.Duration(cons.RestartBrokerPodIntervalInSecond) * time.Second)
					}
				}
			}
		}
		share.IsNameServersStrUpdated = false
	}

	// List the pods for this broker's statefulSet
	podList := &corev1.PodList{}
	labelSelector := labels.SelectorFromSet(labelsForBroker(broker.Name))
	listOps := &client.ListOptions{
		Namespace:     broker.Namespace,
		LabelSelector: labelSelector,
	}
	err = r.client.List(context.TODO(), podList, listOps)
	if err != nil {
		reqLogger.Error(err, "Failed to list pods.", "Broker.Namespace", broker.Namespace, "Broker.Name", broker.Name)
		return reconcile.Result{}, err
	}
	podNames := getPodNames(podList.Items)
	log.Info("broker.Status.Nodes length = " + strconv.Itoa(len(broker.Status.Nodes)))
	log.Info("podNames length = " + strconv.Itoa(len(podNames)))
	// Ensure every pod is in ready
	for _, pod := range podList.Items {
		if !reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
			log.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + ", wait for a moment...")
		}
		if !isReady(pod) {
			reqLogger.Info("pod " + pod.Name + " is not ready, wait for a moment...")
			return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
		}
	}

	if broker.Status.Size != 0 && broker.Spec.Size > broker.Status.Size {
		// Get the metadata including subscriptionGroup.json and topics.json from scale source pod
		k8s, err := tool.NewK8sClient()
		if err != nil {
			log.Error(err, "Error occurred while getting K8s Client")
		}
		sourcePodName := broker.Spec.ScalePodName
		topicsCommand := getCopyMetadataJsonCommand(cons.TopicJsonDir, sourcePodName, broker.Namespace, k8s)
		log.Info("topicsCommand: " + topicsCommand)
		subscriptionGroupCommand := getCopyMetadataJsonCommand(cons.SubscriptionGroupJsonDir, sourcePodName, broker.Namespace, k8s)
		log.Info("subscriptionGroupCommand: " + subscriptionGroupCommand)
		MakeConfigDirCommand := "mkdir -p " + cons.StoreConfigDir
		ChmodDirCommand := "chmod a+rw " + cons.StoreConfigDir
		cmdContent := MakeConfigDirCommand + " && " + ChmodDirCommand
		if topicsCommand != "" {
			cmdContent = cmdContent + " && " + topicsCommand
		}
		if subscriptionGroupCommand != "" {
			cmdContent = cmdContent + " && " + subscriptionGroupCommand
		}
		cmd = []string{"/bin/bash", "-c", cmdContent}
	}

	// Update status.Size if needed
	if broker.Spec.Size != broker.Status.Size {
		log.Info("broker.Status.Size = " + strconv.Itoa(broker.Status.Size))
		log.Info("broker.Spec.Size = " + strconv.Itoa(broker.Spec.Size))
		broker.Status.Size = broker.Spec.Size
		err = r.client.Status().Update(context.TODO(), broker)
		if err != nil {
			reqLogger.Error(err, "Failed to update Broker Size status.")
		}
	}

	// Update status.Nodes if needed
	if !reflect.DeepEqual(podNames, broker.Status.Nodes) {
		broker.Status.Nodes = podNames
		err = r.client.Status().Update(context.TODO(), broker)
		if err != nil {
			reqLogger.Error(err, "Failed to update Broker Nodes status.")
		}
	}

	//podList := &corev1.PodList{}
	//labelSelector := labels.SelectorFromSet(labelsForBroker(broker.Name))
	//listOps := &client.ListOptions{
	//	Namespace:     broker.Namespace,
	//	LabelSelector: labelSelector,
	//}
	//err = r.client.List(context.TODO(), listOps, podList)
	//if err != nil {
	//	reqLogger.Error(err, "Failed to list pods.", "Broker.Namespace", broker.Namespace, "Broker.Name", broker.Name)
	//	return reconcile.Result{}, err
	//}
	//podNames := getPodNames(podList.Items)
	//
	//// Update status.Nodes if needed
	//if !reflect.DeepEqual(podNames, broker.Status.Nodes) {
	//	broker.Status.Nodes = podNames
	//	err := r.client.Status().Update(context.TODO(), broker)
	//	if err != nil {
	//		reqLogger.Error(err, "Failed to update Broker status.")
	//		return reconcile.Result{}, err
	//	}
	//}

	return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}