in pkg/controller/nameservice/nameservice_controller.go [167:260]
func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha1.NameService, request reconcile.Request, requeue bool) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.Info("Check the NameServers status")
// List the pods for this nameService's statefulSet
podList := &corev1.PodList{}
labelSelector := labels.SelectorFromSet(labelsForNameService(instance.Name))
listOps := &client.ListOptions{
Namespace: instance.Namespace,
LabelSelector: labelSelector,
}
err := r.client.List(context.TODO(), podList, listOps)
if err != nil {
reqLogger.Error(err, "Failed to list pods.", "NameService.Namespace", instance.Namespace, "NameService.Name", instance.Name)
return reconcile.Result{Requeue: true}, err
}
hostIps := getNameServers(podList.Items)
sort.Strings(hostIps)
sort.Strings(instance.Status.NameServers)
nameServerListStr := ""
for _, value := range hostIps {
nameServerListStr = nameServerListStr + value + ":9876;"
}
// Update status.NameServers if needed
if !reflect.DeepEqual(hostIps, instance.Status.NameServers) {
oldNameServerListStr := ""
for _, value := range instance.Status.NameServers {
oldNameServerListStr = oldNameServerListStr + value + ":9876;"
}
share.NameServersStr = nameServerListStr[:len(nameServerListStr)-1]
reqLogger.Info("share.NameServersStr:" + share.NameServersStr)
if len(oldNameServerListStr) <= cons.MinIpListLength {
oldNameServerListStr = share.NameServersStr
} else if len(share.NameServersStr) > cons.MinIpListLength {
oldNameServerListStr = oldNameServerListStr[:len(oldNameServerListStr)-1]
share.IsNameServersStrUpdated = true
}
reqLogger.Info("oldNameServerListStr:" + oldNameServerListStr)
instance.Status.NameServers = hostIps
err := r.client.Status().Update(context.TODO(), instance)
// Update the NameServers status with the host ips
reqLogger.Info("Updated the NameServers status with the host IP")
if err != nil {
reqLogger.Error(err, "Failed to update NameServers status of NameService.")
return reconcile.Result{Requeue: true}, err
}
// use admin tool to update broker config
if share.IsNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(share.NameServersStr) > cons.MinIpListLength) {
mqAdmin := cons.AdminToolDir
subCmd := cons.UpdateBrokerConfig
key := cons.ParamNameServiceAddress
reqLogger.Info("share.GroupNum=broker.Spec.Size=" + strconv.Itoa(share.GroupNum))
clusterName := share.BrokerClusterName
reqLogger.Info("Updating config " + key + " of cluster" + clusterName)
command := mqAdmin + " " + subCmd + " -c " + clusterName + " -k " + key + " -n " + oldNameServerListStr + " -v " + share.NameServersStr
cmd := exec.Command("sh", mqAdmin, subCmd, "-c", clusterName, "-k", key, "-n", oldNameServerListStr, "-v", share.NameServersStr)
output, err := cmd.Output()
if err != nil {
reqLogger.Error(err, "Update Broker config "+key+" failed of cluster "+clusterName+", command: "+command)
return reconcile.Result{Requeue: true}, err
}
reqLogger.Info("Successfully updated Broker config " + key + " of cluster " + clusterName + ", command: " + command + ", with output: " + string(output))
}
}
// Print NameServers IP
for i, value := range instance.Status.NameServers {
reqLogger.Info("NameServers IP " + strconv.Itoa(i) + ": " + value)
}
runningNameServerNum := getRunningNameServersNum(podList.Items)
if runningNameServerNum == instance.Spec.Size {
share.IsNameServersStrInitialized = true
share.NameServersStr = nameServerListStr // reassign if operator restarts
}
reqLogger.Info("Share variables", "GroupNum", share.GroupNum,
"NameServersStr", share.NameServersStr, "IsNameServersStrUpdated", share.IsNameServersStrUpdated,
"IsNameServersStrInitialized", share.IsNameServersStrInitialized, "BrokerClusterName", share.BrokerClusterName)
if requeue {
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}
return reconcile.Result{}, nil
}