func()

in runtime/core/protocol/grpc/consumer/consumer_mesh.go [121:150]


func (e *eventMeshConsumer) Start() error {
	// no topics, don't start the consumer
	if e.ConsumerGroupSize() == 0 {
		return nil
	}

	e.consumerGroupTopicConfig.Range(func(key, value any) bool {
		topic := key.(string)
		opt := value.(ConsumerGroupTopicOption).SubscriptionMode()
		switch opt {
		case pb.Subscription_SubscriptionItem_CLUSTERING:
			e.persistentConsumer.Subscribe(topic)
		case pb.Subscription_SubscriptionItem_BROADCASTING:
			e.broadcastConsumer.Subscribe(topic)
		default:
			log.Warnf("un support sub mode:%v", opt)
		}
		return true
	})

	if err := e.broadcastConsumer.Start(); err != nil {
		return err
	}
	if err := e.persistentConsumer.Start(); err != nil {
		return err
	}

	e.serviceState = consts.RUNNING
	return nil
}