public virtual async Task Dispatch_Async()

in src/MessageConsumer.cs [821:947]


        public virtual async Task Dispatch_Async(MessageDispatch dispatch)
        {
            MessageListener listener = this.listener;
            bool dispatchMessage = false;

            try
            {
                ClearMessagesInProgress();
                ClearDeliveredList();

                using(await this.unconsumedMessages.SyncRoot.LockAsync().Await())
                {
                    if(!this.unconsumedMessages.Closed)
                    {
                        if(this.info.Browser || !session.Connection.IsDuplicate(this, dispatch.Message))
                        {
                            if(listener != null && this.unconsumedMessages.Running)
                            {
                                if (RedeliveryExceeded(dispatch))
                                {
                                    await PosionAckAsync(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries).Await();
                                    return;
                                }
                                else
                                {
                                    dispatchMessage = true;
                                }
                            }
                            else
                            {
                                if (!this.unconsumedMessages.Running)
                                {
                                    // delayed redelivery, ensure it can be re delivered
                                    session.Connection.RollbackDuplicate(this, dispatch.Message);
                                }
                                this.unconsumedMessages.Enqueue(dispatch);
                                // TODO - Signal message available when we have that event hook.
                            }
                        }
                        else
                        {
                            // deal with duplicate delivery
                            ConsumerId consumerWithPendingTransaction;
                            if (RedeliveryExpectedInCurrentTransaction(dispatch, true))
                            {
                                Tracer.DebugFormat("Consumer[{0}] tracking transacted({1}) redelivery [{2}]",
                                                   ConsumerId, previouslyDeliveredMessages.TransactionId, dispatch.Message);
                                if (TransactedIndividualAck)
                                {
                                    await ImmediateIndividualTransactedAckAsync(dispatch).Await();
                                }
                                else
                                {
                                    await this.session.SendAckAsync(new MessageAck(dispatch, (byte) AckType.DeliveredAck, 1)).Await();
                                }
                            }
                            else if ((consumerWithPendingTransaction = RedeliveryPendingInCompetingTransaction(dispatch)) != null)
                            {
                                Tracer.WarnFormat("Consumer[{0}] delivering duplicate [{1}], pending transaction completion on ({1}) will rollback",
                                                  ConsumerId, dispatch.Message, consumerWithPendingTransaction);
                                this.session.Connection.RollbackDuplicate(this, dispatch.Message);
                                await Dispatch_Async(dispatch).Await();
                            }
                            else
                            {
                                Tracer.WarnFormat("Consumer[{0}] suppressing duplicate delivery on connection, poison acking: ({1})",
                                                  ConsumerId, dispatch);
                                await PosionAckAsync(dispatch, "Suppressing duplicate delivery on connection, consumer " + ConsumerId).Await();
                            }
                        }
                    }
                }

                if(dispatchMessage)
                {
                    ActiveMQMessage message = CreateActiveMQMessage(dispatch);

                    await BeforeMessageIsConsumedAsync(dispatch).Await();

                    try
                    {
                        bool expired = (!IgnoreExpiration && message.IsExpired());

                        if(!expired)
                        {
                            listener(message);
                        }

                        await this.AfterMessageIsConsumedAsync(dispatch, expired).Await();
                    }
                    catch(Exception e)
                    {
                        dispatch.RollbackCause = e;
                        if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
                        {
                            // Schedule redelivery and possible dlq processing
                            await RollbackAsync().Await();
                        }
                        else
                        {
                            // Transacted or Client ack: Deliver the next message.
                            await this.AfterMessageIsConsumedAsync(dispatch, false).Await();
                        }

                        Tracer.ErrorFormat("Consumer[{0}] Exception while processing message: {1}", this.info.ConsumerId, e);

                        // If aborted we stop the abort here and let normal processing resume.
                        // This allows the session to shutdown normally and ack all messages
                        // that have outstanding acks in this consumer.
                        if((Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
                        {
                            Thread.ResetAbort();
                        }
                    }
                }

                if(++dispatchedCount % 1000 == 0)
                {
                    dispatchedCount = 0;
                    Thread.Sleep(1);
                }
            }
            catch(Exception e)
            {
                this.session.Connection.OnSessionException(this.session, e);
            }
        }