internal async Task ShutdownAsync()

in src/MessageConsumer.cs [505:579]


        internal async Task ShutdownAsync()
        {
            if(!this.unconsumedMessages.Closed)
            {
                if(Tracer.IsDebugEnabled)
                {
                    Tracer.DebugFormat("Shutdown of Consumer[{0}] started.", ConsumerId);
                }

                // Do we have any acks we need to send out before closing?
                // Ack any delivered messages now.
                if(!this.session.IsTransacted)
                {
                    DeliverAcks();
                    if(this.IsAutoAcknowledgeBatch)
                    {
                        await AcknowledgeAsync().Await();
                    }
                }

                if (this.executor != null)
                {
                    this.executor.Shutdown();
                    this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
                    this.executor = null;
                }
                if (this.optimizedAckTask != null)
                {
                    this.session.Scheduler.Cancel(this.optimizedAckTask);
                }

                if (this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
                {
                    if (!this.info.Browser)
                    {
                        // rollback duplicates that aren't acknowledged
                        LinkedList<MessageDispatch> temp = null;
                        using(await this.deliveredMessagesLock.LockAsync().Await())
                        {
                            temp = new LinkedList<MessageDispatch>(this.deliveredMessages);
                        }
                        foreach (MessageDispatch old in temp)
                        {
                            this.session.Connection.RollbackDuplicate(this, old.Message);
                        }
                        temp.Clear();
                    }
                }

                if(!this.session.IsTransacted)
                {
                    using(await this.deliveredMessagesLock.LockAsync().Await())
                    {
                        deliveredMessages.Clear();
                    }
                }

                this.session.RemoveConsumer(this);
                this.unconsumedMessages.Close();

                MessageDispatch[] unconsumed = unconsumedMessages.RemoveAll();
                if (!this.info.Browser)
                {
                    foreach (MessageDispatch old in unconsumed)
                    {
                        // ensure we don't filter this as a duplicate
                        session.Connection.RollbackDuplicate(this, old.Message);
                    }
                }
                if(Tracer.IsDebugEnabled)
                {
                    Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId);
                }
            }
        }