in activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java [1228:1370]
public void rollback() throws JMSException {
clearDeliveredList();
synchronized (unconsumedMessages.getMutex()) {
if (optimizeAcknowledge) {
// remove messages read but not acked at the broker yet through
// optimizeAcknowledge
if (!this.info.isBrowser()) {
synchronized(deliveredMessages) {
for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
// ensure we don't filter this as a duplicate
MessageDispatch md = deliveredMessages.removeLast();
session.connection.rollbackDuplicate(this, md.getMessage());
}
}
}
}
synchronized(deliveredMessages) {
rollbackPreviouslyDeliveredAndNotRedelivered();
if (deliveredMessages.isEmpty()) {
return;
}
MessageDispatch lastMd = deliveredMessages.getFirst();
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = iter.next();
md.getMessage().onMessageRolledBack();
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, md.getMessage());
}
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get sent to the
// DLQ.
// Acknowledge the last message.
MessageAck ack = new MessageAck(lastMd, MessageAck.POISON_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy
+ ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
session.sendAck(ack,true);
// Adjust the window size.
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
deliveredCounter -= deliveredMessages.size();
deliveredMessages.clear();
} else {
// Find what redelivery delay to use, based on the redelivery count of last message.
// Current redelivery count is already increased at this point
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
// Iterating based on redelivery count to find delay to use.
// NOTE: One less than current redelivery count, to use initial delay for first redelivery.
for (int i = 0; i < (currentRedeliveryCount-1); i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
LOG.debug("Redelivery delay calculated for redelivery count {}: {}, for messageId '{}'.", currentRedeliveryCount, redeliveryDelay, lastMd.getMessage().getMessageId());
// only redelivery_ack after first delivery
if (currentRedeliveryCount > 0) {
MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
session.sendAck(ack,true);
}
final LinkedList<MessageDispatch> pendingSessionRedelivery =
new LinkedList<MessageDispatch>(deliveredMessages);
captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(false);
deliveredCounter -= deliveredMessages.size();
deliveredMessages.clear();
if (!unconsumedMessages.isClosed()) {
if (nonBlockingRedelivery) {
Collections.reverse(pendingSessionRedelivery);
// Start up the delivery again a little later.
session.getScheduler().executeAfterDelay(new Runnable() {
@Override
public void run() {
try {
if (!unconsumedMessages.isClosed()) {
for(MessageDispatch dispatch : pendingSessionRedelivery) {
session.dispatch(dispatch);
}
}
} catch (Exception e) {
session.connection.onAsyncException(e);
}
}
}, redeliveryDelay);
} else {
// stop the delivery of messages.
unconsumedMessages.stop();
final ActiveMQMessageConsumer dispatcher = this;
Runnable redispatchWork = new Runnable() {
@Override
public void run() {
try {
if (!unconsumedMessages.isClosed()) {
synchronized (unconsumedMessages.getMutex()) {
for (MessageDispatch md : pendingSessionRedelivery) {
unconsumedMessages.enqueueFirst(md);
}
if (messageListener.get() != null) {
session.redispatch(dispatcher, unconsumedMessages);
}
}
if (started.get()) {
start();
}
}
} catch (JMSException e) {
session.connection.onAsyncException(e);
}
}
};
if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
// Start up the delivery again a little later.
session.getScheduler().executeAfterDelay(redispatchWork, redeliveryDelay);
} else {
redispatchWork.run();
}
}
} else {
for (MessageDispatch md : pendingSessionRedelivery) {
session.connection.rollbackDuplicate(this, md.getMessage());
}
}
}
}
}
}