func()

in runtime/core/protocol/grpc/consumer/consumer_mesh.go [86:119]


func (e *eventMeshConsumer) Init() error {
	// no topics, don't init the consumer
	if e.ConsumerGroupSize() == 0 {
		return nil
	}

	persistProps := make(map[string]string)
	persistProps["isBroadcast"] = "false"
	persistProps["consumerGroup"] = e.ConsumerGroup
	persistProps["eventMeshIDC"] = config.GlobalConfig().Common.IDC
	persistProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
		config.GlobalConfig().Common.Cluster)
	if err := e.persistentConsumer.Init(persistProps); err != nil {
		return err
	}
	clusterEventListener := e.createEventListener(pb.Subscription_SubscriptionItem_CLUSTERING)
	e.persistentConsumer.RegisterListener(clusterEventListener)

	broadcastProps := make(map[string]string)
	broadcastProps["isBroadcast"] = "false"
	broadcastProps["consumerGroup"] = e.ConsumerGroup
	broadcastProps["eventMeshIDC"] = config.GlobalConfig().Common.IDC
	broadcastProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
		config.GlobalConfig().Common.Cluster)
	if err := e.broadcastConsumer.Init(broadcastProps); err != nil {
		return err
	}
	broadcastEventListener := e.createEventListener(pb.Subscription_SubscriptionItem_BROADCASTING)
	e.broadcastConsumer.RegisterListener(broadcastEventListener)
	e.serviceState = consts.INITED

	log.Infof("init the eventmesh consumer success, group:%v", e.ConsumerGroup)
	return nil
}