in runtime/core/protocol/grpc/consumer/consumer_processor.go [244:302]
func (p *processor) ReplyMessage(ctx context.Context, producerMgr producer.ProducerManager, emiter emitter.EventEmitter, msg *pb.SimpleMessage) error {
hdr := msg.Header
if err := validator.ValidateHeader(hdr); err != nil {
log.Warnf("invalid header:%v", err)
emiter.SendStreamResp(hdr, grpc.EVENTMESH_PROTOCOL_HEADER_ERR)
return err
}
if err := validator.ValidateMessage(msg); err != nil {
log.Warnf("invalid body:%v", err)
emiter.SendStreamResp(hdr, grpc.EVENTMESH_PROTOCOL_BODY_ERR)
return err
}
seqNum := msg.SeqNum
uniqID := msg.UniqueId
producerGroup := msg.ProducerGroup
mqCluster := defaultIfEmpty(msg.Properties[consts.PROPERTY_MESSAGE_CLUSTER], "defaultCluster")
replyTopic := mqCluster + "_" + consts.RR_REPLY_TOPIC
msg.Topic = replyTopic
protocolType := hdr.ProtocolType
adp := plugin.Get(plugin.Protocol, protocolType).(protocol.Adapter)
if adp == nil {
log.Warnf("protocol plugin not found:%v", protocolType)
emiter.SendStreamResp(hdr, grpc.EVENTMESH_Plugin_NotFound_ERR)
return ErrProtocolPluginNotFound
}
cevt, err := adp.ToCloudEvent(&grpc.SimpleMessageWrapper{SimpleMessage: msg})
if err != nil {
log.Warnf("transfer to cloud event msg err:%v", err)
emiter.SendStreamResp(hdr, grpc.EVENTMESH_Transfer_Protocol_ERR)
return err
}
emProducer, err := producerMgr.GetProducer(producerGroup)
if err != nil {
log.Warnf("no eventmesh producer found, err:%v, group:%v", err, producerGroup)
emiter.SendStreamResp(hdr, grpc.EVENTMESH_Producer_Group_NotFound_ERR)
return err
}
start := time.Now()
return emProducer.Reply(
producer.SendMessageContext{
Ctx: ctx,
Event: cevt,
BizSeqNO: seqNum,
ProducerAPI: emProducer,
CreateTime: time.Now(),
},
&connector.SendCallback{
OnSuccess: func(result *connector.SendResult) {
log.Infof("message|mq2eventmesh|REPLY|ReplyToServer|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
time.Since(start).Milliseconds(), replyTopic, seqNum, uniqID)
},
OnError: func(result *connector.ErrorResult) {
emiter.SendStreamResp(hdr, grpc.EVENTMESH_REPLY_MSG_ERR)
log.Errorf("message|mq2eventmesh|REPLY|ReplyToServer|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
time.Since(start).Milliseconds(), replyTopic, seqNum, uniqID, result.Err)
},
},
)
}