func()

in consumer/pull_consumer.go [226:287]


func (pc *defaultPullConsumer) Start() error {
	var err error
	pc.once.Do(func() {
		err = pc.validate()
		if err != nil {
			rlog.Error("the consumer group option validate fail", map[string]interface{}{
				rlog.LogKeyConsumerGroup: pc.consumerGroup,
				rlog.LogKeyUnderlayError: err.Error(),
			})
			err = errors.Wrap(err, "the consumer group option validate fail")
			return
		}
		err = pc.defaultConsumer.client.RegisterConsumer(pc.consumerGroup, pc)
		if err != nil {
			rlog.Error("defaultPullConsumer the consumer group has been created, specify another one", map[string]interface{}{
				rlog.LogKeyConsumerGroup: pc.consumerGroup,
			})
			err = errors2.ErrCreated
			return
		}
		err = pc.start()
		if err != nil {
			return
		}
		atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
		go func() {
			for {
				select {
				case pr := <-pc.prCh:
					go func() {
						pc.pullMessage(&pr)
					}()
				case <-pc.done:
					rlog.Info("defaultPullConsumer close PullRequest listener.", map[string]interface{}{
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
					return
				}
			}
		}()
	})
	if err != nil {
		return err
	}
	pc.client.UpdateTopicRouteInfo()
	_, exist := pc.topicSubscribeInfoTable.Load(pc.topic)
	if !exist {
		err = pc.Shutdown()
		if err != nil {
			rlog.Error("defaultPullConsumer.Shutdown . route info not found, it may not exist", map[string]interface{}{
				rlog.LogKeyTopic:         pc.topic,
				rlog.LogKeyUnderlayError: err,
			})
		}
		return fmt.Errorf("the topic=%s route info not found, it may not exist", pc.topic)
	}
	pc.client.CheckClientInBroker()
	pc.client.SendHeartbeatToAllBrokerWithLock()
	go pc.client.RebalanceImmediately()

	return err
}