in src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs [121:157]
public async Task Send(OutboundMessageDispatch envelope)
{
if (envelope.Message.Facade is AmqpNmsMessageFacade facade)
{
AmqpCodec.EncodeMessage(facade);
global::Amqp.Message message = facade.Message;
try
{
// If the transaction has failed due to remote termination etc then we just indicate
// the send has succeeded until the a new transaction is started.
if (session.IsTransacted && session.IsTransactionFailed)
return;
var transactionalState = session.TransactionContext?.GetTxnEnrolledState();
if (envelope.FireAndForget)
{
SendSync(message, transactionalState);
return;
}
await SendAsync(message, transactionalState).Await();
}
catch (AmqpException amqpEx)
{
string messageId = AmqpMessageIdHelper.ToMessageIdString(message.Properties?.GetMessageId());
throw ExceptionSupport.Wrap(amqpEx, $"Failure to send message {messageId} on Producer {info.Id}");
}
catch (Exception ex)
{
Tracer.ErrorFormat(
$"Encountered Error on sending message from Producer {info.Id}. Message: {ex.Message}. Stack : {ex.StackTrace}.");
throw ExceptionSupport.Wrap(ex);
}
}
}