in consumer/pull_consumer.go [226:287]
func (pc *defaultPullConsumer) Start() error {
var err error
pc.once.Do(func() {
err = pc.validate()
if err != nil {
rlog.Error("the consumer group option validate fail", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
rlog.LogKeyUnderlayError: err.Error(),
})
err = errors.Wrap(err, "the consumer group option validate fail")
return
}
err = pc.defaultConsumer.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
rlog.Error("defaultPullConsumer the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
err = errors2.ErrCreated
return
}
err = pc.start()
if err != nil {
return
}
atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
go func() {
for {
select {
case pr := <-pc.prCh:
go func() {
pc.pullMessage(&pr)
}()
case <-pc.done:
rlog.Info("defaultPullConsumer close PullRequest listener.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
}()
})
if err != nil {
return err
}
pc.client.UpdateTopicRouteInfo()
_, exist := pc.topicSubscribeInfoTable.Load(pc.topic)
if !exist {
err = pc.Shutdown()
if err != nil {
rlog.Error("defaultPullConsumer.Shutdown . route info not found, it may not exist", map[string]interface{}{
rlog.LogKeyTopic: pc.topic,
rlog.LogKeyUnderlayError: err,
})
}
return fmt.Errorf("the topic=%s route info not found, it may not exist", pc.topic)
}
pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
go pc.client.RebalanceImmediately()
return err
}