func()

in internal/protocol/meshmessage.go [39:78]


func (m *MeshMessage) Publish(ctx context.Context, topic string, content string, properties map[string]string) error {
	eventmeshCfg := config.Get()
	cfg := &conf.GRPCConfig{
		Host:         eventmeshCfg.EventMesh.Host,
		Port:         eventmeshCfg.EventMesh.GRPC.Port,
		ENV:          eventmeshCfg.EventMesh.Env,
		IDC:          eventmeshCfg.EventMesh.IDC,
		SYS:          eventmeshCfg.EventMesh.Sys,
		Username:     eventmeshCfg.EventMesh.UserName,
		Password:     eventmeshCfg.EventMesh.Password,
		ProtocolType: pgrpc.EventmeshMessage,
		ProducerConfig: conf.ProducerConfig{
			ProducerGroup: eventmeshCfg.EventMesh.ProducerGroup,
		},
	}
	client, err := pgrpc.New(cfg)
	if err != nil {
		return err
	}
	defer closeEventMeshClient(client)
	message := &eventmesh.SimpleMessage{
		Header:        pgrpc.CreateHeader(cfg),
		ProducerGroup: eventmeshCfg.EventMesh.ProducerGroup,
		Topic:         topic,
		Content:       content,
		Ttl:           gconv.String(eventmeshCfg.EventMesh.TTL),
		UniqueId:      uuid.New().String(),
		SeqNum:        uuid.New().String(),
		Properties:    properties,
	}
	resp, err := client.Publish(context.Background(), message)
	if err != nil {
		return err
	}
	log.Get(constants.LogSchedule).Debugf("publish event result: %v", resp.String())
	if resp.RespCode != "0" {
		return fmt.Errorf("eventmesh publish message error: [code]%v[msg]%v", resp.RespCode, resp.RespMsg)
	}
	return nil
}