void ActiveMQConsumerKernel::dispatch()

in activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp [1616:1696]


void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch) {

    try {

        clearMessagesInProgress();
        clearDeliveredList();

        synchronized(this->internal->unconsumedMessages.get()) {

            if (!this->internal->unconsumedMessages->isClosed()) {

                if (this->consumerInfo->isBrowser() || !session->getConnection()->isDuplicate(this, dispatch->getMessage())) {

                    synchronized(&this->internal->listenerMutex) {

                        if (this->internal->listener != NULL && this->internal->unconsumedMessages->isRunning()) {
                            if (this->internal->redeliveryExceeded(dispatch)) {
                                internal->posionAck(dispatch,
                                                    "dispatch to " + getConsumerId()->toString() +
                                                    " exceeds redelivery policy limit:" +
                                                    Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
                                return;
                            }
                            Pointer<cms::Message> message = createCMSMessage(dispatch);
                            beforeMessageIsConsumed(dispatch);
                            try {
                                bool expired = isConsumerExpiryCheckEnabled() && dispatch->getMessage()->isExpired();
                                if (!expired) {
                                    this->internal->listener->onMessage(message.get());
                                }
                                afterMessageIsConsumed(dispatch, expired);
                            } catch (RuntimeException& e) {
                                dispatch->setRollbackCause(e);
                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session->isIndividualAcknowledge()) {
                                    // Schedule redelivery and possible DLQ processing
                                    rollback();
                                } else {
                                    // Transacted or Client ack: Deliver the next message.
                                    afterMessageIsConsumed(dispatch, false);
                                }
                            }
                        } else {
                            if (!this->internal->unconsumedMessages->isRunning()) {
                                // delayed redelivery, ensure it can be re delivered
                                session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
                            }
                            this->internal->unconsumedMessages->enqueue(dispatch);
                            if (this->internal->messageAvailableListener != NULL) {
                                this->internal->messageAvailableListener->onMessageAvailable(this);
                            }
                        }
                    }
                } else {
                    // deal with duplicate delivery
                    if (this->internal->redeliveryExpectedInCurrentTransaction(dispatch, true)) {
                        if (this->internal->transactedIndividualAck) {
                            immediateIndividualTransactedAck(dispatch);
                        } else {
                            Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED, 1));
                            internal->session->sendAck(ack);
                        }
                    } else if ((internal->redeliveryPendingInCompetingTransaction(dispatch))) {
                        internal->session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
                        this->dispatch(dispatch);
                    } else {
                        internal->posionAck(dispatch,
                            std::string("Suppressing duplicate delivery on connection, consumer ") + getConsumerId()->toString());
                    }
                }
            }

            if (++internal->dispatchedCount % 1000 == 0) {
                internal->dispatchedCount = 0;
                Thread::yield();
            }
        }
    }
    AMQ_CATCH_RETHROW(ActiveMQException)
    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
    AMQ_CATCHALL_THROW(ActiveMQException)
}