in src/NMS.AMQP/NmsMessageConsumer.cs [322:413]
private async Task DeliverNextPendingAsync()
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} is about to deliver next pending message.");
}
if (Session.IsStarted && started && Listener != null)
{
using(await syncRoot.LockAsync().Await())
{
try
{
if (started && Listener != null)
{
var envelope = messageQueue.DequeueNoWait();
if (envelope == null)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"No message available for delivery.");
}
return;
}
if (IsMessageExpired(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
}
await DoAckExpiredAsync(envelope).Await();
}
else if (IsRedeliveryExceeded(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}
// TODO: Apply redelivery policy
await DoAckExpiredAsync(envelope).Await();
}
else
{
bool deliveryFailed = false;
bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge || acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;
if (autoAckOrDupsOk)
await DoAckDeliveredAsync(envelope).Await();
else
await AckFromReceiveAsync(envelope).Await();
try
{
Listener.Invoke(envelope.Message.Copy());
}
catch (Exception)
{
deliveryFailed = true;
}
if (autoAckOrDupsOk)
{
if (!deliveryFailed)
await DoAckConsumedAsync(envelope).Await();
else
await DoAckReleasedAsync(envelope).Await();
}
}
}
}
catch (Exception e)
{
// TODO - There are two cases when we can get an error here:
// 1) error returned from the attempted ACK that was sent
// 2) error while attempting to copy the incoming message.
//
// We need to decide how to respond to these, but definitely we cannot
// let this error propagate as it could take down the SessionDispatcher
// To let close the existing session/connection in error handler
using (Session.ExcludeCheckIsOnDeliveryExecutionFlow())
{
Session.Connection.OnAsyncException(e);
}
}
}
}
}