internal async Task RollbackAsync()

in src/MessageConsumer.cs [1442:1592]


        internal async Task RollbackAsync()
        {
            ClearDeliveredList();
            using(await this.unconsumedMessages.SyncRoot.LockAsync().Await())
            {
                if (this.optimizeAcknowledge)
                {
                    // remove messages read but not acked at the broker yet through optimizeAcknowledge
                    if (!this.info.Browser)
                    {
                        using(await this.deliveredMessagesLock.LockAsync().Await())
                        {
                            for (int i = 0; (i < this.deliveredMessages.Count) && (i < ackCounter); i++)
                            {
                                // ensure we don't filter this as a duplicate
                                MessageDispatch dispatch = this.deliveredMessages.Last.Value;
                                this.deliveredMessages.RemoveLast();
                                session.Connection.RollbackDuplicate(this, dispatch.Message);
                            }
                        }
                    }
                }
                
                using(await this.deliveredMessagesLock.LockAsync().Await())
                {
                    RollbackPreviouslyDeliveredAndNotRedelivered();
                    if(this.deliveredMessages.Count == 0)
                    {
                        Tracer.DebugFormat("Consumer[{0}] Rolled Back with no dispatched Messages", ConsumerId);
                        return;
                    }

                    // Only increase the redelivery delay after the first redelivery..
                    MessageDispatch lastMd = this.deliveredMessages.First.Value;
                    int currentRedeliveryCount = lastMd.Message.RedeliveryCounter;

                    redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);

                    MessageId firstMsgId = this.deliveredMessages.Last.Value.Message.MessageId;

                    foreach(MessageDispatch dispatch in this.deliveredMessages)
                    {
                        // Allow the message to update its internal to reflect a Rollback.
                        dispatch.Message.OnMessageRollback();
                        // ensure we don't filter this as a duplicate
                        session.Connection.RollbackDuplicate(this, dispatch.Message);
                    }

                    if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
                       lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
                    {
                        // We need to NACK the messages so that they get sent to the DLQ.
                        MessageAck ack = new MessageAck(lastMd, (byte) AckType.PoisonAck, deliveredMessages.Count);

                        Tracer.DebugFormat("Consumer[{0}] Poison Ack of {1} messages aft max redeliveries: {2}",
                                           ConsumerId, this.deliveredMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);

                        BrokerError poisonCause = new BrokerError();
                        poisonCause.ExceptionClass = "javax.jms.JMSException";
                        poisonCause.Message = "Exceeded RedeliveryPolicy limit: " + RedeliveryPolicy.MaximumRedeliveries;

                        if (lastMd.RollbackCause != null)
                        {
                            BrokerError cause = new BrokerError();
                            cause.ExceptionClass = "javax.jms.JMSException";
                            cause.Message = lastMd.RollbackCause.Message;
                            poisonCause.Cause = cause;
                            poisonCause.Message = poisonCause.Message + " cause: " + lastMd.RollbackCause.Message;
                        }

                        ack.FirstMessageId = firstMsgId;
                        ack.PoisonCause = poisonCause;

                        await this.session.SendAckAsync(ack).Await();

                        // Adjust the window size.
                        additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.deliveredMessages.Count);

                        this.redeliveryDelay = 0;
                        this.deliveredCounter -= this.deliveredMessages.Count;
                        this.deliveredMessages.Clear();
                    }
                    else
                    {
                        // We only send a RedeliveryAck after the first redelivery
                        if(currentRedeliveryCount > 0)
                        {
                            MessageAck ack = new MessageAck(lastMd, (byte) AckType.RedeliveredAck, deliveredMessages.Count);
                            ack.FirstMessageId = firstMsgId;
                            await this.session.SendAckAsync(ack, true).Await();
                        }

                        if (this.nonBlockingRedelivery)
                        {
                            if(redeliveryDelay == 0)
                            {
                                redeliveryDelay = RedeliveryPolicy.InitialRedeliveryDelay;
                            }

                            Tracer.DebugFormat("Consumer[{0}] Rolled Back, Re-enque {1} messages in Non-Blocking mode, delay: {2}",
                                               ConsumerId, this.deliveredMessages.Count, redeliveryDelay);

                            List<MessageDispatch> pendingRedeliveries =
                                new List<MessageDispatch>(this.deliveredMessages);
                            pendingRedeliveries.Reverse();

                            this.deliveredCounter -= this.deliveredMessages.Count;
                            this.deliveredMessages.Clear();

                            this.session.Scheduler.ExecuteAfterDelay(
                                NonBlockingRedeliveryCallback,
                                pendingRedeliveries,
                                TimeSpan.FromMilliseconds(redeliveryDelay));
                        }
                        else
                        {
                            // stop the delivery of messages.
                            this.unconsumedMessages.Stop();

                            Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages",
                                               ConsumerId, this.deliveredMessages.Count);

                            foreach(MessageDispatch dispatch in this.deliveredMessages)
                            {
                                this.unconsumedMessages.EnqueueFirst(dispatch);
                            }

                            this.deliveredCounter -= this.deliveredMessages.Count;
                            this.deliveredMessages.Clear();

                            if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
                            {
                                DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
                                ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
                            }
                            else
                            {
                                Start();
                            }
                        }
                    }
                }
            }

            // Only redispatch if there's an async listener otherwise a synchronous
            // consumer will pull them from the local queue.
            if(this.listener != null)
            {
                this.session.Redispatch(this, this.unconsumedMessages);
            }
        }