in runtime/core/protocol/grpc/consumer/consumer_mesh.go [86:119]
func (e *eventMeshConsumer) Init() error {
// no topics, don't init the consumer
if e.ConsumerGroupSize() == 0 {
return nil
}
persistProps := make(map[string]string)
persistProps["isBroadcast"] = "false"
persistProps["consumerGroup"] = e.ConsumerGroup
persistProps["eventMeshIDC"] = config.GlobalConfig().Common.IDC
persistProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
config.GlobalConfig().Common.Cluster)
if err := e.persistentConsumer.Init(persistProps); err != nil {
return err
}
clusterEventListener := e.createEventListener(pb.Subscription_SubscriptionItem_CLUSTERING)
e.persistentConsumer.RegisterListener(clusterEventListener)
broadcastProps := make(map[string]string)
broadcastProps["isBroadcast"] = "false"
broadcastProps["consumerGroup"] = e.ConsumerGroup
broadcastProps["eventMeshIDC"] = config.GlobalConfig().Common.IDC
broadcastProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
config.GlobalConfig().Common.Cluster)
if err := e.broadcastConsumer.Init(broadcastProps); err != nil {
return err
}
broadcastEventListener := e.createEventListener(pb.Subscription_SubscriptionItem_BROADCASTING)
e.broadcastConsumer.RegisterListener(broadcastEventListener)
e.serviceState = consts.INITED
log.Infof("init the eventmesh consumer success, group:%v", e.ConsumerGroup)
return nil
}