in consumer/push_consumer.go [133:250]
func (pc *pushConsumer) Start() error {
var err error
pc.once.Do(func() {
rlog.Info("the consumer start beginning", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
"messageModel": pc.model,
"unitMode": pc.unitMode,
})
atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
err = pc.validate()
if err != nil {
rlog.Error("the consumer group option validate fail", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
rlog.LogKeyUnderlayError: err.Error(),
})
err = errors.Wrap(err, "the consumer group option validate fail")
return
}
err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
err = errors2.ErrCreated
return
}
err = pc.defaultConsumer.start()
if err != nil {
return
}
retryTopic := internal.GetRetryTopic(pc.consumerGroup)
pc.crCh[retryTopic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
go func() {
// todo start clean msg expired
for {
select {
case pr := <-pc.prCh:
go func() {
pc.pullMessage(&pr)
}()
case <-pc.done:
rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
}()
go primitive.WithRecover(func() {
if pc.consumeOrderly {
return
}
time.Sleep(pc.option.ConsumeTimeout)
pc.cleanExpiredMsg()
ticker := time.NewTicker(pc.option.ConsumeTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pc.cleanExpiredMsg()
case <-pc.done:
rlog.Info("push consumer close cleanExpiredMsg listener.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
})
go primitive.WithRecover(func() {
// initial lock.
if !pc.consumeOrderly {
return
}
time.Sleep(1000 * time.Millisecond)
pc.lockAll()
lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
defer lockTicker.Stop()
for {
select {
case <-lockTicker.C:
pc.lockAll()
case <-pc.done:
rlog.Info("push consumer close tick.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
})
})
if err != nil {
return err
}
pc.client.UpdateTopicRouteInfo()
for k := range pc.subscribedTopic {
_, exist := pc.topicSubscribeInfoTable.Load(k)
if !exist {
pc.Shutdown()
return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
}
}
pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
go pc.client.RebalanceImmediately()
return err
}