func()

in runtime/core/protocol/grpc/producer/producer_processor.go [246:302]


func (p *processor) BatchPublish(ctx context.Context, producerMgr ProducerManager, msg *pb.BatchMessage) (*pb.Response, error) {
	var (
		err error
		hdr = msg.Header
	)
	if err = validator.ValidateHeader(hdr); err != nil {
		log.Warnf("invalid header:%v", err)
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
	}
	if err = validator.ValidateBatchMessage(msg); err != nil {
		log.Warnf("invalid body:%v", err)
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
	}
	protocolType := hdr.ProtocolType
	adp := plugin.Get(plugin.Protocol, protocolType).(protocol.Adapter)
	if adp == nil {
		log.Warnf("protocol plugin not found:%v", protocolType)
		return buildPBResponse(grpc.EVENTMESH_Plugin_NotFound_ERR), ErrProtocolPluginNotFound
	}
	cevts, err := adp.ToCloudEvents(&grpc.BatchMessageWrapper{BatchMessage: msg})
	if err != nil {
		log.Warnf("transfer to cloud event msg err:%v", err)
		return buildPBResponse(grpc.EVENTMESH_Transfer_Protocol_ERR), err
	}
	topic := msg.Topic
	producerGroup := msg.ProducerGroup
	ep, err := producerMgr.GetProducer(producerGroup)
	if err != nil {
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
	}
	// TODO use errorgroup instead
	for _, evt := range cevts {
		seqNum := evt.ID()
		uid := defaultIfEmpty(evt.Extensions()[grpc.UNIQUE_ID], "")
		start := time.Now()
		ep.Send(
			SendMessageContext{
				Ctx:         ctx,
				Event:       evt,
				BizSeqNO:    seqNum,
				ProducerAPI: ep,
				CreateTime:  time.Now(),
			},
			&connector.SendCallback{
				OnSuccess: func(result *connector.SendResult) {
					log.Infof("message|eventMesh2mq|REQ|BatchSend|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
						time.Since(start).Milliseconds(), topic, seqNum, uid)
				},
				OnError: func(result *connector.ErrorResult) {
					log.Errorf("message|eventMesh2mq|REQ|BatchSend|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v, err:%v",
						time.Since(start).Milliseconds(), topic, seqNum, uid, result.Err)
				},
			},
		)
	}
	return buildPBResponse(grpc.SUCCESS), nil
}