in runtime/core/protocol/grpc/producer/producer_processor.go [58:114]
func (p *processor) AsyncMessage(ctx context.Context, producerMgr ProducerManager, msg *pb.SimpleMessage) (*pb.Response, error) {
hdr := msg.Header
if err := validator.ValidateHeader(hdr); err != nil {
log.Warnf("invalid header:%v", err)
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
}
if err := validator.ValidateMessage(msg); err != nil {
log.Warnf("invalid body:%v", err)
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
}
// TODO no ack check, add rate limiter
seqNum := msg.SeqNum
uid := msg.UniqueId
topic := msg.Topic
pg := msg.ProducerGroup
start := time.Now()
protocolType := hdr.ProtocolType
adp := plugin.Get(plugin.Protocol, protocolType).(protocol.Adapter)
if adp == nil {
log.Warnf("protocol plugin not found:%v", protocolType)
return buildPBResponse(grpc.EVENTMESH_Plugin_NotFound_ERR), ErrProtocolPluginNotFound
}
cevt, err := adp.ToCloudEvent(&grpc.SimpleMessageWrapper{SimpleMessage: msg})
if err != nil {
return buildPBResponse(grpc.EVENTMESH_Transfer_Protocol_ERR), err
}
ep, err := producerMgr.GetProducer(pg)
if err != nil {
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
}
var code *grpc.StatusCode
if err = ep.Send(
SendMessageContext{
Ctx: ctx,
Event: cevt,
BizSeqNO: seqNum,
ProducerAPI: ep,
CreateTime: time.Now(),
},
&connector.SendCallback{
OnSuccess: func(result *connector.SendResult) {
code = grpc.SUCCESS
log.Infof("message|eventMesh2mq|REQ|ASYNC|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
time.Since(start).Milliseconds(), topic, seqNum, uid)
},
OnError: func(result *connector.ErrorResult) {
code = grpc.EVENTMESH_SEND_ASYNC_MSG_ERR
log.Errorf("message|eventMesh2mq|REQ|ASYNC|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v, err:%v",
time.Since(start).Milliseconds(), topic, seqNum, uid, result.Err)
},
},
); err != nil {
log.Warnf("send message to mq err:%v", err)
}
return buildPBResponse(code), nil
}