in activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp [1490:1613]
void ActiveMQConsumerKernel::rollback() {
clearDeliveredList();
synchronized(this->internal->unconsumedMessages.get()) {
if (this->internal->optimizeAcknowledge) {
// remove messages read but not acknowledged at the broker yet through optimizeAcknowledge
if (!this->consumerInfo->isBrowser()) {
synchronized(&this->internal->deliveredMessages) {
for (int i = 0; (i < this->internal->deliveredMessages.size()) &&
(i < this->internal->ackCounter); i++) {
// ensure we don't filter this as a duplicate
Pointer<MessageDispatch> md = this->internal->deliveredMessages.removeLast();
session->getConnection()->rollbackDuplicate(this, md->getMessage());
}
}
}
}
synchronized(&this->internal->deliveredMessages) {
this->internal->rollbackPreviouslyDeliveredAndNotRedelivered();
if (this->internal->deliveredMessages.isEmpty()) {
return;
}
// Only increase the redelivery delay after the first redelivery..
Pointer<MessageDispatch> lastMsg = this->internal->deliveredMessages.getFirst();
const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
if (currentRedeliveryCount > 0) {
this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(internal->redeliveryDelay);
} else {
this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
}
Pointer<MessageId> firstMsgId = this->internal->deliveredMessages.getLast()->getMessage()->getMessageId();
Pointer<Iterator<Pointer<MessageDispatch> > > iter(internal->deliveredMessages.iterator());
while (iter->hasNext()) {
Pointer<Message> message = iter->next()->getMessage();
message->setRedeliveryCounter(message->getRedeliveryCounter() + 1);
// ensure we don't filter this as a duplicate
session->getConnection()->rollbackDuplicate(this, message);
}
if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()) {
// We need to NACK the messages so that they get sent to the DLQ.
// Acknowledge the last message.
Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
this->internal->deliveredMessages.size()));
ack->setFirstMessageId(firstMsgId);
std::string message = "Exceeded RedeliveryPolicy max redelivery limit: " +
Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries());
if (!lastMsg->getRollbackCause().getMessage().empty()) {
message.append(" cause: Exception -> ");
message.append(lastMsg->getRollbackCause().getMessage());
}
ack->setPoisonCause(internal->createBrokerError(message));
session->sendAck(ack, true);
// Adjust the window size.
this->internal->additionalWindowSize = Math::max(0,
this->internal->additionalWindowSize - (int) this->internal->deliveredMessages.size());
this->internal->redeliveryDelay = 0;
this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
this->internal->deliveredMessages.clear();
} else {
// only redelivery_ack after first delivery
if (currentRedeliveryCount > 0) {
Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_REDELIVERED,
this->internal->deliveredMessages.size()));
ack->setFirstMessageId(firstMsgId);
session->sendAck(ack);
}
if (this->internal->nonBlockingRedelivery) {
if (!this->internal->unconsumedMessages->isClosed()) {
Pointer<ActiveMQConsumerKernel> self =
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
NonBlockingRedeliveryTask* redeliveryTask =
new NonBlockingRedeliveryTask(session, self, this->internal);
this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
this->internal->deliveredMessages.clear();
this->session->getScheduler()->executeAfterDelay(
redeliveryTask, this->internal->redeliveryDelay);
}
} else {
// stop the delivery of messages.
this->internal->unconsumedMessages->stop();
std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
this->internal->deliveredMessages.iterator());
while (iter->hasNext()) {
this->internal->unconsumedMessages->enqueueFirst(iter->next());
}
this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
this->internal->deliveredMessages.clear();
if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
Pointer<ActiveMQConsumerKernel> self =
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
this->internal->scheduler->executeAfterDelay(
new StartConsumerTask(self, session), internal->redeliveryDelay);
} else {
start();
}
}
}
}
}
if (this->internal->listener != NULL) {
session->redispatch(*this->internal->unconsumedMessages);
}
}