in src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs [164:196]
private async Task SendAsync(global::Amqp.Message message, DeliveryState deliveryState)
{
Outcome outcome = null;
try
{
outcome = await senderLink.SendAsync(message, deliveryState, session.Connection.Provider.SendTimeout).Await();
// SendTimeout being NmsConnectionInfo.INFINITE is handled in SendAsync AmqpSendTask
}
catch (TimeoutException)
{
senderLink.Cancel(message);
throw;
}
if (outcome.Descriptor.Code != MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code)
{
if (outcome.Descriptor.Code == MessageSupport.REJECTED_INSTANCE.Descriptor.Code)
{
Rejected rejected = (Rejected) outcome;
throw ExceptionSupport.GetException(rejected.Error, $"Message {message.Properties.GetMessageId()} rejected");
}
else if (outcome.Descriptor.Code == MessageSupport.RELEASED_INSTANCE.Descriptor.Code)
{
Error error = new Error(ErrorCode.MessageReleased);
throw ExceptionSupport.GetException(error, $"Message {message.Properties.GetMessageId()} released");
}
else
{
Error error = new Error(ErrorCode.InternalError);
throw ExceptionSupport.GetException(error, outcome.ToString());
}
}
}