private async Task ReceiveInternalBaseAsync()

in src/NMS.AMQP/NmsMessageConsumer.cs [476:542]


        private async Task<T> ReceiveInternalBaseAsync<T>(int timeout, Func<InboundMessageDispatch, Task<T>> func)
        {
            try
            {
                long deadline = 0;
                if (timeout > 0)
                {
                    deadline = GetDeadline(timeout);
                }

                while (true)
                {
                    if (Tracer.IsDebugEnabled)
                    {
                        Tracer.Debug("Trying to dequeue next message.");
                    }

                    InboundMessageDispatch envelope = await messageQueue.DequeueAsync(timeout).Await();

                    if (failureCause != null)
                        throw NMSExceptionSupport.Create(failureCause);

                    if (envelope == null)
                        return default;

                    if (IsMessageExpired(envelope))
                    {
                        if (Tracer.IsDebugEnabled)
                        {
                            Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
                        }

                        await DoAckExpiredAsync(envelope).Await();

                        if (timeout > 0)
                            timeout = (int) Math.Max(deadline - DateTime.UtcNow.Ticks / 10_000L, 0);
                    }
                    else if (IsRedeliveryExceeded(envelope))
                    {
                        if (Tracer.IsDebugEnabled)
                        {
                            Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
                        }

                        // TODO: Apply redelivery policy
                        await DoAckExpiredAsync(envelope).Await();
                    }
                    else
                    {
                        if (Tracer.IsDebugEnabled)
                        {
                            Tracer.Debug($"{Info.Id} received message {envelope.Message.NMSMessageId}.");
                        }

                        return await func.Invoke(envelope);
                    }
                }
            }
            catch (NMSException)
            {
                throw;
            }
            catch (Exception ex)
            {
                throw ExceptionSupport.Wrap(ex, "Receive failed");
            }
        }