func()

in golang/client.go [493:550]


func (cli *defaultClient) startUp() error {
	cli.log.Infof("begin to start the rocketmq client")
	cm := NewDefaultClientManager()
	cm.startUp()
	cm.RegisterClient(cli)
	cli.clientManager = cm
	for _, topic := range cli.initTopics {
		_, err := cli.getMessageQueues(context.Background(), topic)
		if err != nil {
			return fmt.Errorf("failed to get topic route data result from remote during client startup, clientId=%s, topics=%v, err=%v", cli.clientID, cli.initTopics, err)
		}
	}
	f := func() {
		cli.router.Range(func(k, v interface{}) bool {
			topic := k.(string)
			newRoute, err := cli.queryRoute(context.TODO(), topic, cli.opts.timeout)
			if err != nil {
				cli.log.Errorf("scheduled queryRoute err=%v", err)
			}
			if newRoute == nil && v != nil {
				cli.log.Info("newRoute is nil, but oldRoute is not. do not update")
				return true
			}
			var oldRoute []*v2.MessageQueue
			if v != nil {
				oldRoute = v.([]*v2.MessageQueue)
			}
			if !routeEqual(oldRoute, newRoute) {
				cli.router.Store(k, newRoute)
				switch impl := cli.clientImpl.(type) {
				case *defaultProducer:
					existing, ok := impl.publishingRouteDataResultCache.Load(topic)
					if !ok {
						plb, err := NewPublishingLoadBalancer(newRoute)
						if err == nil {
							impl.publishingRouteDataResultCache.Store(topic, plb)
						}
					} else {
						impl.publishingRouteDataResultCache.Store(topic, existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute))
					}
				case *defaultSimpleConsumer:
					existing, ok := impl.subTopicRouteDataResultCache.Load(topic)
					if !ok {
						slb, err := NewSubscriptionLoadBalancer(newRoute)
						if err == nil {
							impl.subTopicRouteDataResultCache.Store(topic, slb)
						}
					} else {
						impl.subTopicRouteDataResultCache.Store(topic, existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute))
					}
				}
			}
			return true
		})
	}
	ticker.Tick(f, time.Second*30, cli.done)
	return nil
}