void ActiveMQConsumerKernel::rollback()

in activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp [1490:1613]


void ActiveMQConsumerKernel::rollback() {

    clearDeliveredList();
    synchronized(this->internal->unconsumedMessages.get()) {
        if (this->internal->optimizeAcknowledge) {
            // remove messages read but not acknowledged at the broker yet through optimizeAcknowledge
            if (!this->consumerInfo->isBrowser()) {
                synchronized(&this->internal->deliveredMessages) {
                    for (int i = 0; (i < this->internal->deliveredMessages.size()) &&
                                    (i < this->internal->ackCounter); i++) {
                        // ensure we don't filter this as a duplicate
                        Pointer<MessageDispatch> md = this->internal->deliveredMessages.removeLast();
                        session->getConnection()->rollbackDuplicate(this, md->getMessage());
                    }
                }
            }
        }

        synchronized(&this->internal->deliveredMessages) {
            this->internal->rollbackPreviouslyDeliveredAndNotRedelivered();
            if (this->internal->deliveredMessages.isEmpty()) {
                return;
            }

            // Only increase the redelivery delay after the first redelivery..
            Pointer<MessageDispatch> lastMsg = this->internal->deliveredMessages.getFirst();
            const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
            if (currentRedeliveryCount > 0) {
                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(internal->redeliveryDelay);
            } else {
                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
            }

            Pointer<MessageId> firstMsgId = this->internal->deliveredMessages.getLast()->getMessage()->getMessageId();

            Pointer<Iterator<Pointer<MessageDispatch> > > iter(internal->deliveredMessages.iterator());
            while (iter->hasNext()) {
                Pointer<Message> message = iter->next()->getMessage();
                message->setRedeliveryCounter(message->getRedeliveryCounter() + 1);
                // ensure we don't filter this as a duplicate
                session->getConnection()->rollbackDuplicate(this, message);
            }

            if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
                lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()) {

                // We need to NACK the messages so that they get sent to the DLQ.
                // Acknowledge the last message.
                Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
                                        this->internal->deliveredMessages.size()));
                ack->setFirstMessageId(firstMsgId);

                std::string message = "Exceeded RedeliveryPolicy max redelivery limit: " +
                                       Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries());
                if (!lastMsg->getRollbackCause().getMessage().empty()) {
                    message.append(" cause: Exception -> ");
                    message.append(lastMsg->getRollbackCause().getMessage());
                }

                ack->setPoisonCause(internal->createBrokerError(message));
                session->sendAck(ack, true);
                // Adjust the window size.
                this->internal->additionalWindowSize = Math::max(0,
                    this->internal->additionalWindowSize - (int) this->internal->deliveredMessages.size());
                this->internal->redeliveryDelay = 0;

                this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
                this->internal->deliveredMessages.clear();

            } else {

                // only redelivery_ack after first delivery
                if (currentRedeliveryCount > 0) {
                    Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_REDELIVERED,
                                            this->internal->deliveredMessages.size()));
                    ack->setFirstMessageId(firstMsgId);
                    session->sendAck(ack);
                }

                if (this->internal->nonBlockingRedelivery) {

                    if (!this->internal->unconsumedMessages->isClosed()) {
                        Pointer<ActiveMQConsumerKernel> self =
                            this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());

                        NonBlockingRedeliveryTask* redeliveryTask =
                            new NonBlockingRedeliveryTask(session, self, this->internal);

                        this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
                        this->internal->deliveredMessages.clear();

                        this->session->getScheduler()->executeAfterDelay(
                            redeliveryTask, this->internal->redeliveryDelay);
                    }
                } else {
                    // stop the delivery of messages.
                    this->internal->unconsumedMessages->stop();

                    std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
                        this->internal->deliveredMessages.iterator());
                    while (iter->hasNext()) {
                        this->internal->unconsumedMessages->enqueueFirst(iter->next());
                    }

                    this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
                    this->internal->deliveredMessages.clear();

                    if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
                        Pointer<ActiveMQConsumerKernel> self =
                            this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
                        this->internal->scheduler->executeAfterDelay(
                            new StartConsumerTask(self, session), internal->redeliveryDelay);
                    } else {
                        start();
                    }
                }
            }
        }
    }

    if (this->internal->listener != NULL) {
        session->redispatch(*this->internal->unconsumedMessages);
    }
}