in runtime/core/protocol/grpc/consumer/consumer_mesh.go [121:150]
func (e *eventMeshConsumer) Start() error {
// no topics, don't start the consumer
if e.ConsumerGroupSize() == 0 {
return nil
}
e.consumerGroupTopicConfig.Range(func(key, value any) bool {
topic := key.(string)
opt := value.(ConsumerGroupTopicOption).SubscriptionMode()
switch opt {
case pb.Subscription_SubscriptionItem_CLUSTERING:
e.persistentConsumer.Subscribe(topic)
case pb.Subscription_SubscriptionItem_BROADCASTING:
e.broadcastConsumer.Subscribe(topic)
default:
log.Warnf("un support sub mode:%v", opt)
}
return true
})
if err := e.broadcastConsumer.Start(); err != nil {
return err
}
if err := e.persistentConsumer.Start(); err != nil {
return err
}
e.serviceState = consts.RUNNING
return nil
}