in src/NMS.AMQP/NmsMessageConsumer.cs [345:440]
private async Task DeliverNextPendingAsync(CancellationToken cancellationToken)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} is about to deliver next pending message.");
}
if (Session.IsStarted && this.started && HasMessageListener())
{
using(await syncRoot.LockAsync().Await())
{
try
{
if (this.started && HasMessageListener())
{
var envelope = messageQueue.DequeueNoWait();
if (envelope == null)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("No message available for delivery.");
}
return;
}
if (IsMessageExpired(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
}
await DoAckExpiredAsync(envelope).Await();
}
else if (IsRedeliveryExceeded(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}
// TODO: Apply redelivery policy
await DoAckExpiredAsync(envelope).Await();
}
else
{
bool deliveryFailed = false;
bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge || acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;
if (autoAckOrDupsOk)
await DoAckDeliveredAsync(envelope).Await();
else
await AckFromReceiveAsync(envelope).Await();
try
{
Listener?.Invoke(envelope.Message.Copy());
if (AsyncListener != null)
{
await AsyncListener.Invoke(envelope.Message.Copy(), cancellationToken).Await();
}
}
catch (Exception)
{
deliveryFailed = true;
}
if (autoAckOrDupsOk)
{
if (!deliveryFailed)
await DoAckConsumedAsync(envelope).Await();
else
await DoAckReleasedAsync(envelope).Await();
}
}
}
}
catch (Exception e)
{
// TODO - There are two cases when we can get an error here:
// 1) error returned from the attempted ACK that was sent
// 2) error while attempting to copy the incoming message.
//
// We need to decide how to respond to these, but definitely we cannot
// let this error propagate as it could take down the SessionDispatcher
// To let close the existing session/connection in error handler
using (Session.ExcludeCheckIsOnDeliveryExecutionFlow())
{
Session.Connection.OnAsyncException(e);
}
}
}
}
}