func()

in runtime/core/protocol/grpc/producer/producer_processor.go [176:244]


func (p *processor) RequestReplyMessage(ctx context.Context, producerMgr ProducerManager, msg *pb.SimpleMessage) (*pb.SimpleMessage, error) {
	var (
		err  error
		resp *pb.SimpleMessage
		hdr  = msg.Header
	)
	if err = validator.ValidateHeader(hdr); err != nil {
		log.Warnf("invalid header:%v", err)
		return buildPBSimpleMessage(hdr, grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
	}
	if err = validator.ValidateMessage(msg); err != nil {
		log.Warnf("invalid body:%v", err)
		return buildPBSimpleMessage(hdr, grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
	}
	protocolType := hdr.ProtocolType
	adp := plugin.Get(plugin.Protocol, protocolType).(protocol.Adapter)
	if adp == nil {
		log.Warnf("protocol plugin not found:%v", protocolType)
		return buildPBSimpleMessage(hdr, grpc.EVENTMESH_Plugin_NotFound_ERR), ErrProtocolPluginNotFound
	}
	cevt, err := adp.ToCloudEvent(&grpc.SimpleMessageWrapper{SimpleMessage: msg})
	if err != nil {
		log.Warnf("transfer to cloud event msg err:%v", err)
		return buildPBSimpleMessage(hdr, grpc.EVENTMESH_Transfer_Protocol_ERR), err
	}
	seqNum := msg.SeqNum
	unidID := msg.UniqueId
	topic := msg.Topic
	producerGroup := msg.ProducerGroup
	ttl, _ := StringToDuration(msg.Ttl)
	start := time.Now()
	ep, err := producerMgr.GetProducer(producerGroup)
	if err != nil {
		return buildPBSimpleMessage(hdr, grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
	}

	if err = ep.Request(
		SendMessageContext{
			Ctx:         ctx,
			Event:       cevt,
			BizSeqNO:    seqNum,
			ProducerAPI: ep,
			CreateTime:  time.Now(),
		},
		&connector.RequestReplyCallback{
			OnSuccess: func(event *ce.Event) {
				log.Infof("message|eventmesh2client|REPLY|RequestReply|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v",
					time.Since(start).Milliseconds(), topic, seqNum, unidID)

				m1, err1 := adp.FromCloudEvent(event)
				if err1 != nil {
					log.Warnf("failed to transfer msg from event, err:%v", err)
					err = grpc.EVENTMESH_Transfer_Protocol_ERR.ToError()
					return
				}
				resp = m1.(grpc.SimpleMessageWrapper).SimpleMessage
			},
			OnError: func(result *connector.ErrorResult) {
				log.Errorf("message|mq2eventmesh|REPLY|RequestReply|send2MQCost=%vms|topic=%v|bizSeqNo=%v|uniqueId=%v|err=%v",
					time.Since(start).Milliseconds(), topic, seqNum, unidID, err)
				err = grpc.EVENTMESH_REQUEST_REPLY_MSG_ERR.ToError()
			},
		},
		ttl); err != nil {
		log.Warnf("failed to request message, uniqID:%v, err:%v", unidID, err)
		return nil, err
	}
	return resp, err
}