private async Task DeliverNextPendingAsync()

in src/NMS.AMQP/NmsMessageConsumer.cs [345:440]


        private async Task DeliverNextPendingAsync(CancellationToken cancellationToken)
        {
            if (Tracer.IsDebugEnabled)
            {
                Tracer.Debug($"{Info.Id} is about to deliver next pending message.");
            }
            
            if (Session.IsStarted && this.started && HasMessageListener())
            {
                using(await syncRoot.LockAsync().Await())
                {
                    try
                    {
                        if (this.started && HasMessageListener())
                        {
                            var envelope = messageQueue.DequeueNoWait();
                            if (envelope == null)
                            {
                                if (Tracer.IsDebugEnabled)
                                {
                                    Tracer.Debug("No message available for delivery.");
                                }

                                return;
                            }

                            if (IsMessageExpired(envelope))
                            {
                                if (Tracer.IsDebugEnabled)
                                {
                                    Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
                                }

                                await DoAckExpiredAsync(envelope).Await();
                            }
                            else if (IsRedeliveryExceeded(envelope))
                            {
                                if (Tracer.IsDebugEnabled)
                                {
                                    Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
                                }

                                // TODO: Apply redelivery policy
                                await DoAckExpiredAsync(envelope).Await();
                            }
                            else
                            {
                                bool deliveryFailed = false;
                                bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge || acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;

                                if (autoAckOrDupsOk)
                                    await DoAckDeliveredAsync(envelope).Await();
                                else
                                    await AckFromReceiveAsync(envelope).Await();

                                try
                                {
                                    Listener?.Invoke(envelope.Message.Copy());
                                    if (AsyncListener != null)
                                    {
                                        await AsyncListener.Invoke(envelope.Message.Copy(), cancellationToken).Await();
                                    }
                                }
                                catch (Exception)
                                {
                                    deliveryFailed = true;
                                }

                                if (autoAckOrDupsOk)
                                {
                                    if (!deliveryFailed)
                                        await DoAckConsumedAsync(envelope).Await();
                                    else
                                        await DoAckReleasedAsync(envelope).Await();
                                }
                            }
                        }
                    }
                    catch (Exception e)
                    {
                        // TODO - There are two cases when we can get an error here:
                        // 1) error returned from the attempted ACK that was sent
                        // 2) error while attempting to copy the incoming message.
                        //
                        // We need to decide how to respond to these, but definitely we cannot
                        // let this error propagate as it could take down the SessionDispatcher

                        // To let close the existing session/connection in error handler
                        using (Session.ExcludeCheckIsOnDeliveryExecutionFlow())
                        {
                            Session.Connection.OnAsyncException(e);
                        }
                    }
                }
            }
        }