in internal/route.go [470:529]
func (s *namesrvs) routeData2PublishInfo(topic string, data *TopicRouteData) *TopicPublishInfo {
publishInfo := &TopicPublishInfo{
RouteData: data,
OrderTopic: false,
}
if data.OrderTopicConf != "" {
brokers := strings.Split(data.OrderTopicConf, ";")
for _, broker := range brokers {
item := strings.Split(broker, ":")
nums, _ := strconv.Atoi(item[1])
for i := 0; i < nums; i++ {
mq := &primitive.MessageQueue{
Topic: topic,
BrokerName: item[0],
QueueId: i,
}
publishInfo.MqList = append(publishInfo.MqList, mq)
}
}
publishInfo.OrderTopic = true
return publishInfo
}
qds := data.QueueDataList
sort.SliceStable(qds, func(i, j int) bool {
// sort by increase
return strings.Compare(qds[i].BrokerName, qds[j].BrokerName) < 0
})
for _, qd := range qds {
if !queueIsWriteable(qd.Perm) {
continue
}
var bData *BrokerData
for _, bd := range data.BrokerDataList {
if bd.BrokerName == qd.BrokerName {
bData = bd
break
}
}
if bData == nil || bData.BrokerAddresses[MasterId] == "" {
continue
}
for i := 0; i < qd.WriteQueueNums; i++ {
mq := &primitive.MessageQueue{
Topic: topic,
BrokerName: qd.BrokerName,
QueueId: i,
}
publishInfo.MqList = append(publishInfo.MqList, mq)
}
}
return publishInfo
}