func()

in runtime/core/protocol/grpc/producer/producer_processor.go [58:114]


func (p *processor) AsyncMessage(ctx context.Context, producerMgr ProducerManager, msg *pb.SimpleMessage) (*pb.Response, error) {
	hdr := msg.Header
	if err := validator.ValidateHeader(hdr); err != nil {
		log.Warnf("invalid header:%v", err)
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
	}
	if err := validator.ValidateMessage(msg); err != nil {
		log.Warnf("invalid body:%v", err)
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
	}

	// TODO no ack check, add rate limiter
	seqNum := msg.SeqNum
	uid := msg.UniqueId
	topic := msg.Topic
	pg := msg.ProducerGroup
	start := time.Now()
	protocolType := hdr.ProtocolType
	adp := plugin.Get(plugin.Protocol, protocolType).(protocol.Adapter)
	if adp == nil {
		log.Warnf("protocol plugin not found:%v", protocolType)
		return buildPBResponse(grpc.EVENTMESH_Plugin_NotFound_ERR), ErrProtocolPluginNotFound
	}
	cevt, err := adp.ToCloudEvent(&grpc.SimpleMessageWrapper{SimpleMessage: msg})
	if err != nil {
		return buildPBResponse(grpc.EVENTMESH_Transfer_Protocol_ERR), err
	}
	ep, err := producerMgr.GetProducer(pg)
	if err != nil {
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
	}
	var code *grpc.StatusCode
	if err = ep.Send(
		SendMessageContext{
			Ctx:         ctx,
			Event:       cevt,
			BizSeqNO:    seqNum,
			ProducerAPI: ep,
			CreateTime:  time.Now(),
		},
		&connector.SendCallback{
			OnSuccess: func(result *connector.SendResult) {
				code = grpc.SUCCESS
				log.Infof("message|eventMesh2mq|REQ|ASYNC|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
					time.Since(start).Milliseconds(), topic, seqNum, uid)
			},
			OnError: func(result *connector.ErrorResult) {
				code = grpc.EVENTMESH_SEND_ASYNC_MSG_ERR
				log.Errorf("message|eventMesh2mq|REQ|ASYNC|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v, err:%v",
					time.Since(start).Milliseconds(), topic, seqNum, uid, result.Err)
			},
		},
	); err != nil {
		log.Warnf("send message to mq err:%v", err)
	}
	return buildPBResponse(code), nil
}