private async Task DequeueAsync()

in src/MessageConsumer.cs [1018:1122]


        private async Task<MessageDispatch> DequeueAsync(TimeSpan timeout)
        {
            DateTime deadline = DateTime.Now;

            if(timeout > TimeSpan.Zero)
            {
                deadline += timeout;
            }

            while(true)
            {
                MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);

                // Grab a single date/time for calculations to avoid timing errors.
                DateTime dispatchTime = DateTime.Now;

                if(dispatch == null)
                {
                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
                    {
                        if(dispatchTime > deadline)
                        {
                            // Out of time.
                            timeout = TimeSpan.Zero;
                        }
                        else
                        {
                            // Adjust the timeout to the remaining time.
                            timeout = deadline - dispatchTime;
                        }
                    }
                    else
                    {
                        // Informs the caller of an error in the event that an async exception
                        // took down the parent connection.
                        if(this.failureError != null)
                        {
                            throw NMSExceptionSupport.Create(this.failureError);
                        }

                        return null;
                    }
                }
                else if(dispatch.Message == null)
                {
                    return null;
                }
                else if(ConsumeExpiredMessage(dispatch))
                {
                    Tracer.DebugFormat("Consumer[{0}] received expired message: {1}",
                                       ConsumerId, dispatch.Message.MessageId);

                    await BeforeMessageIsConsumedAsync(dispatch).Await();
                    await AfterMessageIsConsumedAsync(dispatch, true).Await();
                    // Refresh the dispatch time
                    dispatchTime = DateTime.Now;

                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
                    {
                        if(dispatchTime > deadline)
                        {
                            // Out of time.
                            timeout = TimeSpan.Zero;
                        }
                        else
                        {
                            // Adjust the timeout to the remaining time.
                            timeout = deadline - dispatchTime;
                        }
                    }

                    SendPullRequest((long) timeout.TotalMilliseconds);
                }
                else if (RedeliveryExceeded(dispatch))
                {
                    Tracer.DebugFormat("Consumer[{0}] received with excessive redelivered: {1}",
                                       ConsumerId, dispatch);
                    await PosionAckAsync(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery " +
                                                   "policy limit:" + redeliveryPolicy.MaximumRedeliveries).Await();

                    // Refresh the dispatch time
                    dispatchTime = DateTime.Now;

                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
                    {
                        if(dispatchTime > deadline)
                        {
                            // Out of time.
                            timeout = TimeSpan.Zero;
                        }
                        else
                        {
                            // Adjust the timeout to the remaining time.
                            timeout = deadline - dispatchTime;
                        }
                    }

                    SendPullRequest((long) timeout.TotalMilliseconds);
                }
                else
                {
                    return dispatch;
                }
            }
        }