func()

in runtime/core/protocol/grpc/producer/producer_processor.go [116:174]


func (p *processor) ReplyMessage(ctx context.Context, producerMgr ProducerManager, emiter emitter.EventEmitter, msg *pb.SimpleMessage) error {
	hdr := msg.Header
	if err := validator.ValidateHeader(hdr); err != nil {
		log.Warnf("invalid header:%v", err)
		emiter.SendStreamResp(hdr, grpc.EVENTMESH_PROTOCOL_HEADER_ERR)
		return err
	}
	if err := validator.ValidateMessage(msg); err != nil {
		log.Warnf("invalid body:%v", err)
		emiter.SendStreamResp(hdr, grpc.EVENTMESH_PROTOCOL_BODY_ERR)
		return err
	}
	seqNum := msg.SeqNum
	uniqID := msg.UniqueId
	producerGroup := msg.ProducerGroup
	mqCluster := defaultIfEmpty(msg.Properties[consts.PROPERTY_MESSAGE_CLUSTER], "defaultCluster")
	replyTopic := mqCluster + "_" + consts.RR_REPLY_TOPIC
	msg.Topic = replyTopic
	protocolType := hdr.ProtocolType
	adp := plugin.Get(plugin.Protocol, protocolType).(protocol.Adapter)
	if adp == nil {
		log.Warnf("protocol plugin not found:%v", protocolType)
		emiter.SendStreamResp(hdr, grpc.EVENTMESH_Plugin_NotFound_ERR)
		return ErrProtocolPluginNotFound
	}
	cevt, err := adp.ToCloudEvent(&grpc.SimpleMessageWrapper{SimpleMessage: msg})
	if err != nil {
		log.Warnf("transfer to cloud event msg err:%v", err)
		emiter.SendStreamResp(hdr, grpc.EVENTMESH_Transfer_Protocol_ERR)
		return err
	}
	emProducer, err := producerMgr.GetProducer(producerGroup)
	if err != nil {
		log.Warnf("no eventmesh producer found, err:%v, group:%v", err, producerGroup)
		emiter.SendStreamResp(hdr, grpc.EVENTMESH_Producer_Group_NotFound_ERR)
		return err
	}
	start := time.Now()
	return emProducer.Reply(
		SendMessageContext{
			Ctx:         ctx,
			Event:       cevt,
			BizSeqNO:    seqNum,
			ProducerAPI: emProducer,
			CreateTime:  time.Now(),
		},
		&connector.SendCallback{
			OnSuccess: func(result *connector.SendResult) {
				log.Infof("message|mq2eventmesh|REPLY|ReplyToServer|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
					time.Since(start).Milliseconds(), replyTopic, seqNum, uniqID)
			},
			OnError: func(result *connector.ErrorResult) {
				emiter.SendStreamResp(hdr, grpc.EVENTMESH_REPLY_MSG_ERR)
				log.Errorf("message|mq2eventmesh|REPLY|ReplyToServer|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
					time.Since(start).Milliseconds(), replyTopic, seqNum, uniqID, result.Err)
			},
		},
	)
}