public void rollback()

in activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java [1228:1370]


    public void rollback() throws JMSException {
        clearDeliveredList();
        synchronized (unconsumedMessages.getMutex()) {
            if (optimizeAcknowledge) {
                // remove messages read but not acked at the broker yet through
                // optimizeAcknowledge
                if (!this.info.isBrowser()) {
                    synchronized(deliveredMessages) {
                        for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
                            // ensure we don't filter this as a duplicate
                            MessageDispatch md = deliveredMessages.removeLast();
                            session.connection.rollbackDuplicate(this, md.getMessage());
                        }
                    }
                }
            }
            synchronized(deliveredMessages) {
                rollbackPreviouslyDeliveredAndNotRedelivered();
                if (deliveredMessages.isEmpty()) {
                    return;
                }

                MessageDispatch lastMd = deliveredMessages.getFirst();
                MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();

                for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
                    MessageDispatch md = iter.next();
                    md.getMessage().onMessageRolledBack();
                    // ensure we don't filter this as a duplicate
                    session.connection.rollbackDuplicate(this, md.getMessage());
                }

                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
                    && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
                    // We need to NACK the messages so that they get sent to the
                    // DLQ.
                    // Acknowledge the last message.

                    MessageAck ack = new MessageAck(lastMd, MessageAck.POISON_ACK_TYPE, deliveredMessages.size());
                    ack.setFirstMessageId(firstMsgId);
                    ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter()  + "] exceeds redelivery policy limit:" + redeliveryPolicy
                            + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
                    session.sendAck(ack,true);
                    // Adjust the window size.
                    additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());

                    deliveredCounter -= deliveredMessages.size();
                    deliveredMessages.clear();

                } else {
                    // Find what redelivery delay to use, based on the redelivery count of last message.
                    // Current redelivery count is already increased at this point
                    final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
                    // Iterating based on redelivery count to find delay to use.
                    // NOTE: One less than current redelivery count, to use initial delay for first redelivery.
                    for (int i = 0; i < (currentRedeliveryCount-1); i++) {
                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
                    }
                    LOG.debug("Redelivery delay calculated for redelivery count {}: {}, for messageId '{}'.", currentRedeliveryCount, redeliveryDelay, lastMd.getMessage().getMessageId());

                    // only redelivery_ack after first delivery
                    if (currentRedeliveryCount > 0) {
                        MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
                        ack.setFirstMessageId(firstMsgId);
                        session.sendAck(ack,true);
                    }

                    final LinkedList<MessageDispatch> pendingSessionRedelivery =
                            new LinkedList<MessageDispatch>(deliveredMessages);

                    captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(false);

                    deliveredCounter -= deliveredMessages.size();
                    deliveredMessages.clear();

                    if (!unconsumedMessages.isClosed()) {

                        if (nonBlockingRedelivery) {
                            Collections.reverse(pendingSessionRedelivery);

                            // Start up the delivery again a little later.
                            session.getScheduler().executeAfterDelay(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        if (!unconsumedMessages.isClosed()) {
                                            for(MessageDispatch dispatch : pendingSessionRedelivery) {
                                                session.dispatch(dispatch);
                                            }
                                        }
                                    } catch (Exception e) {
                                        session.connection.onAsyncException(e);
                                    }
                                }
                            }, redeliveryDelay);

                        } else {
                            // stop the delivery of messages.
                            unconsumedMessages.stop();

                            final ActiveMQMessageConsumer dispatcher = this;

                            Runnable redispatchWork = new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        if (!unconsumedMessages.isClosed()) {
                                            synchronized (unconsumedMessages.getMutex()) {
                                                for (MessageDispatch md : pendingSessionRedelivery) {
                                                    unconsumedMessages.enqueueFirst(md);
                                                }

                                                if (messageListener.get() != null) {
                                                    session.redispatch(dispatcher, unconsumedMessages);
                                                }
                                            }
                                            if (started.get()) {
                                                start();
                                            }
                                        }
                                    } catch (JMSException e) {
                                        session.connection.onAsyncException(e);
                                    }
                                }
                            };

                            if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
                                // Start up the delivery again a little later.
                                session.getScheduler().executeAfterDelay(redispatchWork, redeliveryDelay);
                            } else {
                                redispatchWork.run();
                            }
                        }
                    } else {
                        for (MessageDispatch md : pendingSessionRedelivery) {
                            session.connection.rollbackDuplicate(this, md.getMessage());
                        }
                    }
                }
            }
        }
    }