in runtime/core/protocol/grpc/producer/producer_processor.go [246:302]
func (p *processor) BatchPublish(ctx context.Context, producerMgr ProducerManager, msg *pb.BatchMessage) (*pb.Response, error) {
var (
err error
hdr = msg.Header
)
if err = validator.ValidateHeader(hdr); err != nil {
log.Warnf("invalid header:%v", err)
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
}
if err = validator.ValidateBatchMessage(msg); err != nil {
log.Warnf("invalid body:%v", err)
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
}
protocolType := hdr.ProtocolType
adp := plugin.Get(plugin.Protocol, protocolType).(protocol.Adapter)
if adp == nil {
log.Warnf("protocol plugin not found:%v", protocolType)
return buildPBResponse(grpc.EVENTMESH_Plugin_NotFound_ERR), ErrProtocolPluginNotFound
}
cevts, err := adp.ToCloudEvents(&grpc.BatchMessageWrapper{BatchMessage: msg})
if err != nil {
log.Warnf("transfer to cloud event msg err:%v", err)
return buildPBResponse(grpc.EVENTMESH_Transfer_Protocol_ERR), err
}
topic := msg.Topic
producerGroup := msg.ProducerGroup
ep, err := producerMgr.GetProducer(producerGroup)
if err != nil {
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
}
// TODO use errorgroup instead
for _, evt := range cevts {
seqNum := evt.ID()
uid := defaultIfEmpty(evt.Extensions()[grpc.UNIQUE_ID], "")
start := time.Now()
ep.Send(
SendMessageContext{
Ctx: ctx,
Event: evt,
BizSeqNO: seqNum,
ProducerAPI: ep,
CreateTime: time.Now(),
},
&connector.SendCallback{
OnSuccess: func(result *connector.SendResult) {
log.Infof("message|eventMesh2mq|REQ|BatchSend|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
time.Since(start).Milliseconds(), topic, seqNum, uid)
},
OnError: func(result *connector.ErrorResult) {
log.Errorf("message|eventMesh2mq|REQ|BatchSend|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v, err:%v",
time.Since(start).Milliseconds(), topic, seqNum, uid, result.Err)
},
},
)
}
return buildPBResponse(grpc.SUCCESS), nil
}