func()

in internal/route.go [123:209]


func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) {
	s.lockNamesrv.Lock()
	defer s.lockNamesrv.Unlock()

	var (
		routeData *TopicRouteData
		err       error
	)

	t := topic
	if len(defaultTopic) > 0 {
		t = defaultTopic
	}
	routeData, err = s.queryTopicRouteInfoFromServer(t)

	if err != nil {
		rlog.Warning("query topic route from server error", map[string]interface{}{
			rlog.LogKeyUnderlayError: err,
		})
	}

	if routeData == nil {
		rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
			rlog.LogKeyTopic: topic,
		})
		return nil, false, err
	}

	if len(defaultTopic) > 0 {
		for _, q := range routeData.QueueDataList {
			if q.ReadQueueNums > defaultQueueNum {
				q.ReadQueueNums = defaultQueueNum
				q.WriteQueueNums = defaultQueueNum
			}
		}
	}

	oldRouteData, exist := s.routeDataMap.Load(topic)

	changed := true
	if exist {
		changed = s.topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
	}

	if changed {
		if s.bundleClient != nil {
			s.bundleClient.producerMap.Range(func(key, value interface{}) bool {
				p := value.(InnerProducer)
				updated := changed
				if !updated {
					updated = p.IsPublishTopicNeedUpdate(topic)
				}
				if updated {
					publishInfo := s.bundleClient.GetNameSrv().(*namesrvs).routeData2PublishInfo(topic, routeData)
					publishInfo.HaveTopicRouterInfo = true
					p.UpdateTopicPublishInfo(topic, publishInfo)
				}
				return true
			})
			s.bundleClient.consumerMap.Range(func(key, value interface{}) bool {
				consumer := value.(InnerConsumer)
				updated := changed
				if !updated {
					updated = consumer.IsSubscribeTopicNeedUpdate(topic)
				}
				if updated {
					consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, routeData))
				}

				return true
			})
			rlog.Info("change the route for clients", nil)
		}

		s.routeDataMap.Store(topic, routeData)
		rlog.Info("the topic route info changed", map[string]interface{}{
			rlog.LogKeyTopic:            topic,
			rlog.LogKeyValueChangedFrom: oldRouteData,
			rlog.LogKeyValueChangedTo:   routeData.String(),
		})
		for _, brokerData := range routeData.BrokerDataList {
			s.brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
		}
	}

	return routeData.clone(), changed, nil
}