in golang/client.go [493:550]
func (cli *defaultClient) startUp() error {
cli.log.Infof("begin to start the rocketmq client")
cm := NewDefaultClientManager()
cm.startUp()
cm.RegisterClient(cli)
cli.clientManager = cm
for _, topic := range cli.initTopics {
_, err := cli.getMessageQueues(context.Background(), topic)
if err != nil {
return fmt.Errorf("failed to get topic route data result from remote during client startup, clientId=%s, topics=%v, err=%v", cli.clientID, cli.initTopics, err)
}
}
f := func() {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)
newRoute, err := cli.queryRoute(context.TODO(), topic, cli.opts.timeout)
if err != nil {
cli.log.Errorf("scheduled queryRoute err=%v", err)
}
if newRoute == nil && v != nil {
cli.log.Info("newRoute is nil, but oldRoute is not. do not update")
return true
}
var oldRoute []*v2.MessageQueue
if v != nil {
oldRoute = v.([]*v2.MessageQueue)
}
if !routeEqual(oldRoute, newRoute) {
cli.router.Store(k, newRoute)
switch impl := cli.clientImpl.(type) {
case *defaultProducer:
existing, ok := impl.publishingRouteDataResultCache.Load(topic)
if !ok {
plb, err := NewPublishingLoadBalancer(newRoute)
if err == nil {
impl.publishingRouteDataResultCache.Store(topic, plb)
}
} else {
impl.publishingRouteDataResultCache.Store(topic, existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute))
}
case *defaultSimpleConsumer:
existing, ok := impl.subTopicRouteDataResultCache.Load(topic)
if !ok {
slb, err := NewSubscriptionLoadBalancer(newRoute)
if err == nil {
impl.subTopicRouteDataResultCache.Store(topic, slb)
}
} else {
impl.subTopicRouteDataResultCache.Store(topic, existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute))
}
}
}
return true
})
}
ticker.Tick(f, time.Second*30, cli.done)
return nil
}