in activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp [684:808]
void ActiveMQConnection::close() {
try {
if (this->isClosed()) {
return;
}
Exception ex;
bool hasException = false;
// If we are running lets stop first.
if (!this->transportFailed.get()) {
try {
this->stop();
} catch (cms::CMSException& error) {
if (!hasException) {
ex = ActiveMQException(error.clone());
hasException = true;
}
}
}
// Indicates we are on the way out to suppress any exceptions getting
// passed on from the transport as it goes down.
this->closing.set(true);
if (this->config->scheduler != NULL) {
try {
this->config->scheduler->stop();
} catch (Exception& error) {
if (!hasException) {
ex = error;
ex.setMark(__FILE__, __LINE__);
hasException = true;
}
}
}
long long lastDeliveredSequenceId = -1;
// Get the complete list of active sessions.
try {
this->config->sessionsLock.writeLock().lock();
// We need to use a copy since we aren't able to use CopyOnWriteArrayList
ArrayList<Pointer<ActiveMQSessionKernel> > sessions(this->config->activeSessions);
std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator());
// Dispose of all the Session resources we know are still open.
while (iter->hasNext()) {
Pointer<ActiveMQSessionKernel> session = iter->next();
try {
session->dispose();
lastDeliveredSequenceId = Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
} catch (cms::CMSException& ex) {
}
}
this->config->activeSessions.clear();
this->config->sessionsLock.writeLock().unlock();
} catch (Exception& error) {
this->config->sessionsLock.writeLock().unlock();
if (!hasException) {
ex = error;
ex.setMark(__FILE__, __LINE__);
hasException = true;
}
}
// As TemporaryQueue and TemporaryTopic instances are bound to a connection
// we should just delete them after the connection is closed to free up memory
if (this->config->advisoryConsumer != NULL) {
this->config->advisoryConsumer->dispose();
}
ArrayList<Pointer<ActiveMQTempDestination> > tempDests(this->config->activeTempDestinations.values());
Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(tempDests.iterator());
try {
while (iterator->hasNext()) {
Pointer<ActiveMQTempDestination> dest = iterator->next();
dest->close();
}
} catch (cms::CMSException& error) {
if (!hasException) {
ex = ActiveMQException(error.clone());
hasException = true;
}
}
try {
if (this->config->executor != NULL) {
this->config->executor->shutdown();
}
} catch (Exception& error) {
if (!hasException) {
ex = error;
ex.setMark(__FILE__, __LINE__);
hasException = true;
}
}
// Now inform the Broker we are shutting down.
try {
this->disconnect(lastDeliveredSequenceId);
} catch (Exception& error) {
if (!hasException) {
ex = error;
ex.setMark(__FILE__, __LINE__);
hasException = true;
}
}
// Once current deliveries are done this stops the delivery
// of any new messages.
this->started.set(false);
this->closed.set(true);
if (hasException) {
throw ex;
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}