in pkg/controller/broker/broker_controller.go [119:348]
func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.Info("Reconciling Broker.")
// Fetch the Broker instance
broker := &rocketmqv1alpha1.Broker{}
err := r.client.Get(context.TODO(), request.NamespacedName, broker)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
reqLogger.Info("Broker resource not found. Ignoring since object must be deleted.")
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "Failed to get Broker.")
return reconcile.Result{}, err
}
if broker.Status.Size == 0 {
share.GroupNum = broker.Spec.Size
} else {
share.GroupNum = broker.Status.Size
}
if broker.Spec.NameServers == "" {
// wait for name server ready when create broker cluster if nameServers is omitted
for {
if share.IsNameServersStrInitialized {
break
} else {
log.Info("Broker Waiting for name server ready...")
time.Sleep(time.Duration(cons.WaitForNameServerReadyInSecond) * time.Second)
}
}
} else {
share.NameServersStr = broker.Spec.NameServers
}
if broker.Spec.ClusterMode == "" {
broker.Spec.ClusterMode = "STATIC"
}
if broker.Spec.ClusterMode == "CONTROLLER" && share.ControllerAccessPoint == "" {
log.Info("Broker Waiting for Controller ready...")
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}
share.BrokerClusterName = broker.Name
replicaPerGroup := broker.Spec.ReplicaPerGroup
reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup))
for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ {
reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum))
dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0)
// Check if the statefulSet already exists, if not create a new one
found := &appsv1.StatefulSet{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Master Broker StatefulSet.", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
err = r.client.Create(context.TODO(), dep)
if err != nil {
reqLogger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
}
} else if err != nil {
reqLogger.Error(err, "Failed to get broker master StatefulSet.")
}
for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
err = r.client.Create(context.TODO(), replicaDep)
if err != nil {
reqLogger.Error(err, "Failed to create new StatefulSet of broker-"+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name)
}
} else if err != nil {
reqLogger.Error(err, "Failed to get broker replica StatefulSet.")
}
}
}
// Check for name server scaling
if broker.Spec.AllowRestart {
// The following code will restart all brokers to update NAMESRV_ADDR env
if share.IsNameServersStrUpdated {
for brokerGroupIndex := 0; brokerGroupIndex < broker.Spec.Size; brokerGroupIndex++ {
brokerName := getBrokerName(broker, brokerGroupIndex)
// Update master broker
reqLogger.Info("Update Master Broker NAMESRV_ADDR of " + brokerName)
dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0)
found := &appsv1.StatefulSet{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found)
if err != nil {
reqLogger.Error(err, "Failed to get broker master StatefulSet of "+brokerName)
} else {
found.Spec.Template.Spec.Containers[0].Env[0].Value = share.NameServersStr
err = r.client.Update(context.TODO(), found)
if err != nil {
reqLogger.Error(err, "Failed to update NAMESRV_ADDR of master broker "+brokerName, "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
} else {
reqLogger.Info("Successfully updated NAMESRV_ADDR of master broker "+brokerName, "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
}
time.Sleep(time.Duration(cons.RestartBrokerPodIntervalInSecond) * time.Second)
}
// Update replicas brokers
for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ {
reqLogger.Info("Update Replica Broker NAMESRV_ADDR of " + brokerName + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup))
replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex)
replicaFound := &appsv1.StatefulSet{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, replicaFound)
if err != nil {
reqLogger.Error(err, "Failed to get broker replica StatefulSet of "+brokerName)
} else {
for index := range replicaFound.Spec.Template.Spec.Containers[0].Env {
if cons.EnvNameServiceAddress == replicaFound.Spec.Template.Spec.Containers[0].Env[index].Name {
replicaFound.Spec.Template.Spec.Containers[0].Env[index].Value = share.NameServersStr
break
}
}
err = r.client.Update(context.TODO(), replicaFound)
if err != nil {
reqLogger.Error(err, "Failed to update NAMESRV_ADDR of "+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaFound.Namespace, "StatefulSet.Name", replicaFound.Name)
} else {
reqLogger.Info("Successfully updated NAMESRV_ADDR of "+strconv.Itoa(brokerGroupIndex)+"-replica-"+strconv.Itoa(replicaIndex), "StatefulSet.Namespace", replicaFound.Namespace, "StatefulSet.Name", replicaFound.Name)
}
time.Sleep(time.Duration(cons.RestartBrokerPodIntervalInSecond) * time.Second)
}
}
}
}
share.IsNameServersStrUpdated = false
}
// List the pods for this broker's statefulSet
podList := &corev1.PodList{}
labelSelector := labels.SelectorFromSet(labelsForBroker(broker.Name))
listOps := &client.ListOptions{
Namespace: broker.Namespace,
LabelSelector: labelSelector,
}
err = r.client.List(context.TODO(), podList, listOps)
if err != nil {
reqLogger.Error(err, "Failed to list pods.", "Broker.Namespace", broker.Namespace, "Broker.Name", broker.Name)
return reconcile.Result{}, err
}
podNames := getPodNames(podList.Items)
log.Info("broker.Status.Nodes length = " + strconv.Itoa(len(broker.Status.Nodes)))
log.Info("podNames length = " + strconv.Itoa(len(podNames)))
// Ensure every pod is in ready
for _, pod := range podList.Items {
if !reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
log.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + ", wait for a moment...")
}
if !isReady(pod) {
reqLogger.Info("pod " + pod.Name + " is not ready, wait for a moment...")
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}
}
if broker.Status.Size != 0 && broker.Spec.Size > broker.Status.Size {
// Get the metadata including subscriptionGroup.json and topics.json from scale source pod
k8s, err := tool.NewK8sClient()
if err != nil {
log.Error(err, "Error occurred while getting K8s Client")
}
sourcePodName := broker.Spec.ScalePodName
topicsCommand := getCopyMetadataJsonCommand(cons.TopicJsonDir, sourcePodName, broker.Namespace, k8s)
log.Info("topicsCommand: " + topicsCommand)
subscriptionGroupCommand := getCopyMetadataJsonCommand(cons.SubscriptionGroupJsonDir, sourcePodName, broker.Namespace, k8s)
log.Info("subscriptionGroupCommand: " + subscriptionGroupCommand)
MakeConfigDirCommand := "mkdir -p " + cons.StoreConfigDir
ChmodDirCommand := "chmod a+rw " + cons.StoreConfigDir
cmdContent := MakeConfigDirCommand + " && " + ChmodDirCommand
if topicsCommand != "" {
cmdContent = cmdContent + " && " + topicsCommand
}
if subscriptionGroupCommand != "" {
cmdContent = cmdContent + " && " + subscriptionGroupCommand
}
cmd = []string{"/bin/bash", "-c", cmdContent}
}
// Update status.Size if needed
if broker.Spec.Size != broker.Status.Size {
log.Info("broker.Status.Size = " + strconv.Itoa(broker.Status.Size))
log.Info("broker.Spec.Size = " + strconv.Itoa(broker.Spec.Size))
broker.Status.Size = broker.Spec.Size
err = r.client.Status().Update(context.TODO(), broker)
if err != nil {
reqLogger.Error(err, "Failed to update Broker Size status.")
}
}
// Update status.Nodes if needed
if !reflect.DeepEqual(podNames, broker.Status.Nodes) {
broker.Status.Nodes = podNames
err = r.client.Status().Update(context.TODO(), broker)
if err != nil {
reqLogger.Error(err, "Failed to update Broker Nodes status.")
}
}
//podList := &corev1.PodList{}
//labelSelector := labels.SelectorFromSet(labelsForBroker(broker.Name))
//listOps := &client.ListOptions{
// Namespace: broker.Namespace,
// LabelSelector: labelSelector,
//}
//err = r.client.List(context.TODO(), listOps, podList)
//if err != nil {
// reqLogger.Error(err, "Failed to list pods.", "Broker.Namespace", broker.Namespace, "Broker.Name", broker.Name)
// return reconcile.Result{}, err
//}
//podNames := getPodNames(podList.Items)
//
//// Update status.Nodes if needed
//if !reflect.DeepEqual(podNames, broker.Status.Nodes) {
// broker.Status.Nodes = podNames
// err := r.client.Status().Update(context.TODO(), broker)
// if err != nil {
// reqLogger.Error(err, "Failed to update Broker status.")
// return reconcile.Result{}, err
// }
//}
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}