in internal/route.go [123:209]
func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) {
s.lockNamesrv.Lock()
defer s.lockNamesrv.Unlock()
var (
routeData *TopicRouteData
err error
)
t := topic
if len(defaultTopic) > 0 {
t = defaultTopic
}
routeData, err = s.queryTopicRouteInfoFromServer(t)
if err != nil {
rlog.Warning("query topic route from server error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
}
if routeData == nil {
rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
rlog.LogKeyTopic: topic,
})
return nil, false, err
}
if len(defaultTopic) > 0 {
for _, q := range routeData.QueueDataList {
if q.ReadQueueNums > defaultQueueNum {
q.ReadQueueNums = defaultQueueNum
q.WriteQueueNums = defaultQueueNum
}
}
}
oldRouteData, exist := s.routeDataMap.Load(topic)
changed := true
if exist {
changed = s.topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
}
if changed {
if s.bundleClient != nil {
s.bundleClient.producerMap.Range(func(key, value interface{}) bool {
p := value.(InnerProducer)
updated := changed
if !updated {
updated = p.IsPublishTopicNeedUpdate(topic)
}
if updated {
publishInfo := s.bundleClient.GetNameSrv().(*namesrvs).routeData2PublishInfo(topic, routeData)
publishInfo.HaveTopicRouterInfo = true
p.UpdateTopicPublishInfo(topic, publishInfo)
}
return true
})
s.bundleClient.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
updated := changed
if !updated {
updated = consumer.IsSubscribeTopicNeedUpdate(topic)
}
if updated {
consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, routeData))
}
return true
})
rlog.Info("change the route for clients", nil)
}
s.routeDataMap.Store(topic, routeData)
rlog.Info("the topic route info changed", map[string]interface{}{
rlog.LogKeyTopic: topic,
rlog.LogKeyValueChangedFrom: oldRouteData,
rlog.LogKeyValueChangedTo: routeData.String(),
})
for _, brokerData := range routeData.BrokerDataList {
s.brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
}
}
return routeData.clone(), changed, nil
}