in runtime/core/protocol/grpc/producer/producer_processor.go [176:244]
func (p *processor) RequestReplyMessage(ctx context.Context, producerMgr ProducerManager, msg *pb.SimpleMessage) (*pb.SimpleMessage, error) {
var (
err error
resp *pb.SimpleMessage
hdr = msg.Header
)
if err = validator.ValidateHeader(hdr); err != nil {
log.Warnf("invalid header:%v", err)
return buildPBSimpleMessage(hdr, grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
}
if err = validator.ValidateMessage(msg); err != nil {
log.Warnf("invalid body:%v", err)
return buildPBSimpleMessage(hdr, grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
}
protocolType := hdr.ProtocolType
adp := plugin.Get(plugin.Protocol, protocolType).(protocol.Adapter)
if adp == nil {
log.Warnf("protocol plugin not found:%v", protocolType)
return buildPBSimpleMessage(hdr, grpc.EVENTMESH_Plugin_NotFound_ERR), ErrProtocolPluginNotFound
}
cevt, err := adp.ToCloudEvent(&grpc.SimpleMessageWrapper{SimpleMessage: msg})
if err != nil {
log.Warnf("transfer to cloud event msg err:%v", err)
return buildPBSimpleMessage(hdr, grpc.EVENTMESH_Transfer_Protocol_ERR), err
}
seqNum := msg.SeqNum
unidID := msg.UniqueId
topic := msg.Topic
producerGroup := msg.ProducerGroup
ttl, _ := StringToDuration(msg.Ttl)
start := time.Now()
ep, err := producerMgr.GetProducer(producerGroup)
if err != nil {
return buildPBSimpleMessage(hdr, grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
}
if err = ep.Request(
SendMessageContext{
Ctx: ctx,
Event: cevt,
BizSeqNO: seqNum,
ProducerAPI: ep,
CreateTime: time.Now(),
},
&connector.RequestReplyCallback{
OnSuccess: func(event *ce.Event) {
log.Infof("message|eventmesh2client|REPLY|RequestReply|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
time.Since(start).Milliseconds(), topic, seqNum, unidID)
m1, err1 := adp.FromCloudEvent(event)
if err1 != nil {
log.Warnf("failed to transfer msg from event, err:%v", err)
err = grpc.EVENTMESH_Transfer_Protocol_ERR.ToError()
return
}
resp = m1.(grpc.SimpleMessageWrapper).SimpleMessage
},
OnError: func(result *connector.ErrorResult) {
log.Errorf("message|mq2eventmesh|REPLY|RequestReply|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v|err=%v",
time.Since(start).Milliseconds(), topic, seqNum, unidID, err)
err = grpc.EVENTMESH_REQUEST_REPLY_MSG_ERR.ToError()
},
},
ttl); err != nil {
log.Warnf("failed to request message, uniqID:%v, err:%v", unidID, err)
return nil, err
}
return resp, err
}