in src/NMS.AMQP/NmsMessageConsumer.cs [476:542]
private async Task<T> ReceiveInternalBaseAsync<T>(int timeout, Func<InboundMessageDispatch, Task<T>> func)
{
try
{
long deadline = 0;
if (timeout > 0)
{
deadline = GetDeadline(timeout);
}
while (true)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Trying to dequeue next message.");
}
InboundMessageDispatch envelope = await messageQueue.DequeueAsync(timeout).Await();
if (failureCause != null)
throw NMSExceptionSupport.Create(failureCause);
if (envelope == null)
return default;
if (IsMessageExpired(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
}
await DoAckExpiredAsync(envelope).Await();
if (timeout > 0)
timeout = (int) Math.Max(deadline - DateTime.UtcNow.Ticks / 10_000L, 0);
}
else if (IsRedeliveryExceeded(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}
// TODO: Apply redelivery policy
await DoAckExpiredAsync(envelope).Await();
}
else
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} received message {envelope.Message.NMSMessageId}.");
}
return await func.Invoke(envelope);
}
}
}
catch (NMSException)
{
throw;
}
catch (Exception ex)
{
throw ExceptionSupport.Wrap(ex, "Receive failed");
}
}