void ActiveMQConnection::close()

in activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp [684:808]


void ActiveMQConnection::close() {

    try {

        if (this->isClosed()) {
            return;
        }

        Exception ex;
        bool hasException = false;

        // If we are running lets stop first.
        if (!this->transportFailed.get()) {
            try {
                this->stop();
            } catch (cms::CMSException& error) {
                if (!hasException) {
                    ex = ActiveMQException(error.clone());
                    hasException = true;
                }
            }
        }

        // Indicates we are on the way out to suppress any exceptions getting
        // passed on from the transport as it goes down.
        this->closing.set(true);

        if (this->config->scheduler != NULL) {
            try {
                this->config->scheduler->stop();
            } catch (Exception& error) {
                if (!hasException) {
                    ex = error;
                    ex.setMark(__FILE__, __LINE__);
                    hasException = true;
                }
            }
        }

        long long lastDeliveredSequenceId = -1;

        // Get the complete list of active sessions.
        try {
            this->config->sessionsLock.writeLock().lock();

            // We need to use a copy since we aren't able to use CopyOnWriteArrayList
            ArrayList<Pointer<ActiveMQSessionKernel> > sessions(this->config->activeSessions);
            std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator());

            // Dispose of all the Session resources we know are still open.
            while (iter->hasNext()) {
                Pointer<ActiveMQSessionKernel> session = iter->next();
                try {
                    session->dispose();
                    lastDeliveredSequenceId = Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
                } catch (cms::CMSException& ex) {
                }
            }

            this->config->activeSessions.clear();
            this->config->sessionsLock.writeLock().unlock();
        } catch (Exception& error) {
            this->config->sessionsLock.writeLock().unlock();
            if (!hasException) {
                ex = error;
                ex.setMark(__FILE__, __LINE__);
                hasException = true;
            }
        }

        // As TemporaryQueue and TemporaryTopic instances are bound to a connection
        // we should just delete them after the connection is closed to free up memory
        if (this->config->advisoryConsumer != NULL) {
            this->config->advisoryConsumer->dispose();
        }

        ArrayList<Pointer<ActiveMQTempDestination> > tempDests(this->config->activeTempDestinations.values());
        Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(tempDests.iterator());

        try {
            while (iterator->hasNext()) {
                Pointer<ActiveMQTempDestination> dest = iterator->next();
                dest->close();
            }
        } catch (cms::CMSException& error) {
            if (!hasException) {
                ex = ActiveMQException(error.clone());
                hasException = true;
            }
        }

        try {
            if (this->config->executor != NULL) {
                this->config->executor->shutdown();
            }
        } catch (Exception& error) {
            if (!hasException) {
                ex = error;
                ex.setMark(__FILE__, __LINE__);
                hasException = true;
            }
        }

        // Now inform the Broker we are shutting down.
        try {
            this->disconnect(lastDeliveredSequenceId);
        } catch (Exception& error) {
            if (!hasException) {
                ex = error;
                ex.setMark(__FILE__, __LINE__);
                hasException = true;
            }
        }

        // Once current deliveries are done this stops the delivery
        // of any new messages.
        this->started.set(false);
        this->closed.set(true);

        if (hasException) {
            throw ex;
        }
    }
    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}