func()

in consumer/push_consumer.go [133:250]


func (pc *pushConsumer) Start() error {
	var err error
	pc.once.Do(func() {
		rlog.Info("the consumer start beginning", map[string]interface{}{
			rlog.LogKeyConsumerGroup: pc.consumerGroup,
			"messageModel":           pc.model,
			"unitMode":               pc.unitMode,
		})
		atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
		err = pc.validate()
		if err != nil {
			rlog.Error("the consumer group option validate fail", map[string]interface{}{
				rlog.LogKeyConsumerGroup: pc.consumerGroup,
				rlog.LogKeyUnderlayError: err.Error(),
			})
			err = errors.Wrap(err, "the consumer group option validate fail")
			return
		}

		err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
		if err != nil {
			rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
				rlog.LogKeyConsumerGroup: pc.consumerGroup,
			})
			err = errors2.ErrCreated
			return
		}

		err = pc.defaultConsumer.start()
		if err != nil {
			return
		}

		retryTopic := internal.GetRetryTopic(pc.consumerGroup)
		pc.crCh[retryTopic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)

		go func() {
			// todo start clean msg expired
			for {
				select {
				case pr := <-pc.prCh:
					go func() {
						pc.pullMessage(&pr)
					}()
				case <-pc.done:
					rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
					return
				}
			}
		}()

		go primitive.WithRecover(func() {
			if pc.consumeOrderly {
				return
			}
			time.Sleep(pc.option.ConsumeTimeout)
			pc.cleanExpiredMsg()

			ticker := time.NewTicker(pc.option.ConsumeTimeout)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					pc.cleanExpiredMsg()
				case <-pc.done:
					rlog.Info("push consumer close cleanExpiredMsg listener.", map[string]interface{}{
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
					return
				}
			}
		})

		go primitive.WithRecover(func() {
			// initial lock.
			if !pc.consumeOrderly {
				return
			}

			time.Sleep(1000 * time.Millisecond)
			pc.lockAll()

			lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
			defer lockTicker.Stop()
			for {
				select {
				case <-lockTicker.C:
					pc.lockAll()
				case <-pc.done:
					rlog.Info("push consumer close tick.", map[string]interface{}{
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
					return
				}
			}
		})
	})

	if err != nil {
		return err
	}

	pc.client.UpdateTopicRouteInfo()
	for k := range pc.subscribedTopic {
		_, exist := pc.topicSubscribeInfoTable.Load(k)
		if !exist {
			pc.Shutdown()
			return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
		}
	}
	pc.client.CheckClientInBroker()
	pc.client.SendHeartbeatToAllBrokerWithLock()
	go pc.client.RebalanceImmediately()

	return err
}