in activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp [908:1002]
void ActiveMQConsumerKernel::dispose() {
try {
if (!this->isClosed()) {
if (!session->isTransacted()) {
deliverAcks();
if (isAutoAcknowledgeBatch()) {
acknowledge();
}
}
this->internal->started.set(false);
if (this->internal->executor != NULL) {
this->internal->executor->shutdown();
this->internal->executor->awaitTermination(60, TimeUnit::SECONDS);
this->internal->executor.reset(NULL);
}
if (this->internal->optimizedAckTask != NULL) {
this->session->getScheduler()->cancel(this->internal->optimizedAckTask);
this->internal->optimizedAckTask = NULL;
}
if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
if (!this->consumerInfo->isBrowser()) {
// roll back duplicates that aren't acknowledged
ArrayList< Pointer<MessageDispatch> > tmp;
synchronized(&this->internal->deliveredMessages) {
tmp.copy(this->internal->deliveredMessages);
}
Pointer< Iterator<Pointer<MessageDispatch> > > iter(tmp.iterator());
while (iter->hasNext()) {
Pointer<MessageDispatch> msg = iter->next();
this->session->getConnection()->rollbackDuplicate(this, msg->getMessage());
}
tmp.clear();
}
}
// Identifies any errors encountered during shutdown.
bool haveException = false;
ActiveMQException error;
if (!this->internal->session->isTransacted()) {
// For IndividualAck Mode we need to unlink the ack handler to remove a
// cyclic reference to the MessageDispatch that brought the message to us.
synchronized(&internal->deliveredMessages) {
if (this->session->isIndividualAcknowledge()) {
std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(this->internal->deliveredMessages.iterator());
while (iter->hasNext()) {
iter->next()->getMessage()->setAckHandler(Pointer<ActiveMQAckHandler>());
}
}
this->internal->deliveredMessages.clear();
}
}
// Stop and Wakeup all sync consumers.
this->internal->unconsumedMessages->close();
// Remove this Consumer from the Connections set of Dispatchers
Pointer<ActiveMQConsumerKernel> consumer(this);
try {
this->session->removeConsumer(consumer);
} catch (Exception& e) {
consumer.release();
throw;
}
consumer.release();
// Ensure these are filtered as duplicates.
std::vector< Pointer<MessageDispatch> > list = this->internal->unconsumedMessages->removeAll();
if (!this->consumerInfo->isBrowser()) {
std::vector< Pointer<MessageDispatch> >::const_iterator iter = list.begin();
for (; iter != list.end(); ++iter) {
Pointer<MessageDispatch> md = *iter;
this->session->getConnection()->rollbackDuplicate(this, md->getMessage());
}
}
// If we encountered an error, propagate it.
if (haveException) {
error.setMark(__FILE__, __LINE__);
throw error;
}
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}