void ActiveMQConsumerKernel::dispose()

in activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp [908:1002]


void ActiveMQConsumerKernel::dispose() {

    try {

        if (!this->isClosed()) {

            if (!session->isTransacted()) {
                deliverAcks();
                if (isAutoAcknowledgeBatch()) {
                    acknowledge();
                }
            }

            this->internal->started.set(false);

            if (this->internal->executor != NULL) {
                this->internal->executor->shutdown();
                this->internal->executor->awaitTermination(60, TimeUnit::SECONDS);
                this->internal->executor.reset(NULL);
            }

            if (this->internal->optimizedAckTask != NULL) {
                this->session->getScheduler()->cancel(this->internal->optimizedAckTask);
                this->internal->optimizedAckTask = NULL;
            }

            if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
                if (!this->consumerInfo->isBrowser()) {
                    // roll back duplicates that aren't acknowledged
                    ArrayList< Pointer<MessageDispatch> > tmp;
                    synchronized(&this->internal->deliveredMessages) {
                        tmp.copy(this->internal->deliveredMessages);
                    }
                    Pointer< Iterator<Pointer<MessageDispatch> > > iter(tmp.iterator());
                    while (iter->hasNext()) {
                        Pointer<MessageDispatch> msg = iter->next();
                        this->session->getConnection()->rollbackDuplicate(this, msg->getMessage());
                    }
                    tmp.clear();
                }
            }

            // Identifies any errors encountered during shutdown.
            bool haveException = false;
            ActiveMQException error;

            if (!this->internal->session->isTransacted()) {
                // For IndividualAck Mode we need to unlink the ack handler to remove a
                // cyclic reference to the MessageDispatch that brought the message to us.
                synchronized(&internal->deliveredMessages) {
                    if (this->session->isIndividualAcknowledge()) {
                        std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(this->internal->deliveredMessages.iterator());
                        while (iter->hasNext()) {
                            iter->next()->getMessage()->setAckHandler(Pointer<ActiveMQAckHandler>());
                        }
                    }
                    this->internal->deliveredMessages.clear();
                }
            }

            // Stop and Wakeup all sync consumers.
            this->internal->unconsumedMessages->close();

            // Remove this Consumer from the Connections set of Dispatchers
            Pointer<ActiveMQConsumerKernel> consumer(this);
            try {
                this->session->removeConsumer(consumer);
            } catch (Exception& e) {
                consumer.release();
                throw;
            }
            consumer.release();

            // Ensure these are filtered as duplicates.
            std::vector< Pointer<MessageDispatch> > list = this->internal->unconsumedMessages->removeAll();
            if (!this->consumerInfo->isBrowser()) {
                std::vector< Pointer<MessageDispatch> >::const_iterator iter = list.begin();

                for (; iter != list.end(); ++iter) {
                    Pointer<MessageDispatch> md = *iter;
                    this->session->getConnection()->rollbackDuplicate(this, md->getMessage());
                }
            }

            // If we encountered an error, propagate it.
            if (haveException) {
                error.setMark(__FILE__, __LINE__);
                throw error;
            }
        }
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}