public final void acknowledge()

in activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java [193:396]


    public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
        // Handle the standard acknowledgment case.
        boolean callDispatchMatched = false;
        Destination destination = null;

        if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
            // suppress unexpected ack exception in this expected case
            LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack);
            return;
        }

        LOG.trace("ack: {}", ack);

        synchronized(dispatchLock) {
            if (ack.isStandardAck()) {
                // First check if the ack matches the dispatched. When using failover this might
                // not be the case. We don't ever want to ack the wrong messages.
                assertAckMatchesDispatched(ack);

                // Acknowledge all dispatched messages up till the message id of
                // the acknowledgment.
                boolean inAckRange = false;
                List<MessageReference> removeList = new ArrayList<MessageReference>();
                for (final MessageReference node : dispatched) {
                    MessageId messageId = node.getMessageId();
                    if (ack.getFirstMessageId() == null
                            || ack.getFirstMessageId().equals(messageId)) {
                        inAckRange = true;
                    }
                    if (inAckRange) {
                        // Don't remove the nodes until we are committed.
                        if (!context.isInTransaction()) {
                            getSubscriptionStatistics().getDequeues().increment();
                            removeList.add(node);
                            contractPrefetchExtension(1);
                        } else {
                            registerRemoveSync(context, node);
                        }
                        acknowledge(context, ack, node);
                        if (ack.getLastMessageId().equals(messageId)) {
                            destination = (Destination) node.getRegionDestination();
                            callDispatchMatched = true;
                            break;
                        }
                    }
                }
                for (final MessageReference node : removeList) {
                    dispatched.remove(node);
                    decrementPrefetchCounter(node);
                }
                // this only happens after a reconnect - get an ack which is not
                // valid
                if (!callDispatchMatched) {
                    LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack);
                }
            } else if (ack.isIndividualAck()) {
                // Message was delivered and acknowledge - but only delete the
                // individual message
                for (final MessageReference node : dispatched) {
                    MessageId messageId = node.getMessageId();
                    if (ack.getLastMessageId().equals(messageId)) {
                        // Don't remove the nodes until we are committed - immediateAck option
                        if (!context.isInTransaction()) {
                            getSubscriptionStatistics().getDequeues().increment();
                            dispatched.remove(node);
                            decrementPrefetchCounter(node);
                            contractPrefetchExtension(1);
                        } else {
                            registerRemoveSync(context, node);
                            expandPrefetchExtension(1);
                        }
                        acknowledge(context, ack, node);
                        destination = (Destination) node.getRegionDestination();
                        callDispatchMatched = true;
                        break;
                    }
                }
            } else if (ack.isDeliveredAck()) {
                // Message was delivered but not acknowledged: update pre-fetch
                // counters.
                int index = 0;
                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
                    final MessageReference node = iter.next();
                    Destination nodeDest = (Destination) node.getRegionDestination();
                    if (ack.getLastMessageId().equals(node.getMessageId())) {
                        expandPrefetchExtension(ack.getMessageCount());
                        destination = nodeDest;
                        callDispatchMatched = true;
                        break;
                    }
                }
                if (!callDispatchMatched) {
                    throw new JMSException(
                            "Could not correlate acknowledgment with dispatched message: "
                                    + ack);
                }
            } else if (ack.isExpiredAck()) {
                // Message was expired
                int index = 0;
                boolean inAckRange = false;
                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
                    final MessageReference node = iter.next();
                    Destination nodeDest = (Destination) node.getRegionDestination();
                    MessageId messageId = node.getMessageId();
                    if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
                        inAckRange = true;
                    }
                    if (inAckRange) {
                        if (broker.isExpired(node)) {
                            processExpiredAck(context, nodeDest, node);
                        }
                        iter.remove();
                        decrementPrefetchCounter(node);

                        if (ack.getLastMessageId().equals(messageId)) {
                            contractPrefetchExtension(1);
                            destination = (Destination) node.getRegionDestination();
                            callDispatchMatched = true;
                            break;
                        }
                    }
                }
                if (!callDispatchMatched) {
                    throw new JMSException(
                            "Could not correlate expiration acknowledgment with dispatched message: "
                                    + ack);
                }
            } else if (ack.isRedeliveredAck()) {
                // Message was re-delivered but it was not yet considered to be
                // a DLQ message.
                boolean inAckRange = false;
                for (final MessageReference node : dispatched) {
                    MessageId messageId = node.getMessageId();
                    if (ack.getFirstMessageId() == null
                            || ack.getFirstMessageId().equals(messageId)) {
                        inAckRange = true;
                    }
                    if (inAckRange) {
                        if (ack.getLastMessageId().equals(messageId)) {
                            destination = (Destination) node.getRegionDestination();
                            callDispatchMatched = true;
                            break;
                        }
                    }
                }
                if (!callDispatchMatched) {
                    throw new JMSException(
                            "Could not correlate acknowledgment with dispatched message: "
                                    + ack);
                }
            } else if (ack.isPoisonAck()) {
                // TODO: what if the message is already in a DLQ???
                // Handle the poison ACK case: we need to send the message to a
                // DLQ
                if (ack.isInTransaction()) {
                    throw new JMSException("Poison ack cannot be transacted: "
                            + ack);
                }
                int index = 0;
                boolean inAckRange = false;
                List<MessageReference> removeList = new ArrayList<MessageReference>();
                for (final MessageReference node : dispatched) {
                    MessageId messageId = node.getMessageId();
                    if (ack.getFirstMessageId() == null
                            || ack.getFirstMessageId().equals(messageId)) {
                        inAckRange = true;
                    }
                    if (inAckRange) {
                        sendToDLQ(context, node, ack.getPoisonCause());
                        Destination nodeDest = (Destination) node.getRegionDestination();
                        removeList.add(node);
                        getSubscriptionStatistics().getDequeues().increment();
                        index++;
                        acknowledge(context, ack, node);
                        if (ack.getLastMessageId().equals(messageId)) {
                            contractPrefetchExtension(1);
                            destination = nodeDest;
                            callDispatchMatched = true;
                            break;
                        }
                    }
                }
                for (final MessageReference node : removeList) {
                    dispatched.remove(node);
                    decrementPrefetchCounter(node);
                }
                if (!callDispatchMatched) {
                    throw new JMSException(
                            "Could not correlate acknowledgment with dispatched message: "
                                    + ack);
                }
            }
        }
        if (callDispatchMatched && destination != null) {
            destination.wakeup();
            dispatchPending();

            if (pending.isEmpty()) {
                wakeupDestinationsForDispatch();
            }
        } else {
            LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
        }
    }