in activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp [1616:1696]
void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch) {
try {
clearMessagesInProgress();
clearDeliveredList();
synchronized(this->internal->unconsumedMessages.get()) {
if (!this->internal->unconsumedMessages->isClosed()) {
if (this->consumerInfo->isBrowser() || !session->getConnection()->isDuplicate(this, dispatch->getMessage())) {
synchronized(&this->internal->listenerMutex) {
if (this->internal->listener != NULL && this->internal->unconsumedMessages->isRunning()) {
if (this->internal->redeliveryExceeded(dispatch)) {
internal->posionAck(dispatch,
"dispatch to " + getConsumerId()->toString() +
" exceeds redelivery policy limit:" +
Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
return;
}
Pointer<cms::Message> message = createCMSMessage(dispatch);
beforeMessageIsConsumed(dispatch);
try {
bool expired = isConsumerExpiryCheckEnabled() && dispatch->getMessage()->isExpired();
if (!expired) {
this->internal->listener->onMessage(message.get());
}
afterMessageIsConsumed(dispatch, expired);
} catch (RuntimeException& e) {
dispatch->setRollbackCause(e);
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session->isIndividualAcknowledge()) {
// Schedule redelivery and possible DLQ processing
rollback();
} else {
// Transacted or Client ack: Deliver the next message.
afterMessageIsConsumed(dispatch, false);
}
}
} else {
if (!this->internal->unconsumedMessages->isRunning()) {
// delayed redelivery, ensure it can be re delivered
session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
}
this->internal->unconsumedMessages->enqueue(dispatch);
if (this->internal->messageAvailableListener != NULL) {
this->internal->messageAvailableListener->onMessageAvailable(this);
}
}
}
} else {
// deal with duplicate delivery
if (this->internal->redeliveryExpectedInCurrentTransaction(dispatch, true)) {
if (this->internal->transactedIndividualAck) {
immediateIndividualTransactedAck(dispatch);
} else {
Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED, 1));
internal->session->sendAck(ack);
}
} else if ((internal->redeliveryPendingInCompetingTransaction(dispatch))) {
internal->session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
this->dispatch(dispatch);
} else {
internal->posionAck(dispatch,
std::string("Suppressing duplicate delivery on connection, consumer ") + getConsumerId()->toString());
}
}
}
if (++internal->dispatchedCount % 1000 == 0) {
internal->dispatchedCount = 0;
Thread::yield();
}
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}