public void removeSubscription()

in activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java [497:613]


    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
            throws Exception {
        super.removeSubscription(context, sub, lastDeliveredSequenceId);
        // synchronize with dispatch method so that no new messages are sent
        // while removing up a subscription.
        pagedInPendingDispatchLock.writeLock().lock();
        try {
            LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{
                    getActiveMQDestination().getQualifiedName(),
                    sub,
                    lastDeliveredSequenceId,
                    getDestinationStatistics().getDequeues().getCount(),
                    getDestinationStatistics().getDispatched().getCount(),
                    getDestinationStatistics().getInflight().getCount(),
                    sub.getConsumerInfo().getAssignedGroupCount(destination)
            });
            consumersLock.writeLock().lock();
            try {
                removeFromConsumerList(sub);
                if (sub.getConsumerInfo().isExclusive()) {
                    Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
                    if (exclusiveConsumer == sub) {
                        exclusiveConsumer = null;
                        for (Subscription s : consumers) {
                            if (s.getConsumerInfo().isExclusive()
                                    && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
                                            .getConsumerInfo().getPriority())) {
                                exclusiveConsumer = s;

                            }
                        }
                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                    }
                } else if (isAllConsumersExclusiveByDefault()) {
                    Subscription exclusiveConsumer = null;
                    for (Subscription s : consumers) {
                        if (exclusiveConsumer == null
                                || s.getConsumerInfo().getPriority() > exclusiveConsumer
                                .getConsumerInfo().getPriority()) {
                            exclusiveConsumer = s;
                                }
                    }
                    dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                }
                ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
                getMessageGroupOwners().removeConsumer(consumerId);

                // redeliver inflight messages

                boolean markAsRedelivered = false;
                MessageReference lastDeliveredRef = null;
                List<MessageReference> unAckedMessages = sub.remove(context, this);

                // locate last redelivered in unconsumed list (list in delivery rather than seq order)
                if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
                    for (MessageReference ref : unAckedMessages) {
                        if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
                            lastDeliveredRef = ref;
                            markAsRedelivered = true;
                            LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
                            break;
                        }
                    }
                }

                for (Iterator<MessageReference> unackedListIterator = unAckedMessages.iterator(); unackedListIterator.hasNext(); ) {
                    MessageReference ref = unackedListIterator.next();
                    // AMQ-5107: don't resend if the broker is shutting down
                    if ( this.brokerService.isStopping() ) {
                        break;
                    }
                    QueueMessageReference qmr = (QueueMessageReference) ref;
                    if (qmr.getLockOwner() == sub) {
                        qmr.unlock();

                        // have no delivery information
                        if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
                            qmr.incrementRedeliveryCounter();
                        } else {
                            if (markAsRedelivered) {
                                qmr.incrementRedeliveryCounter();
                            }
                            if (ref == lastDeliveredRef) {
                                // all that follow were not redelivered
                                markAsRedelivered = false;
                            }
                        }
                    }
                    if (qmr.isDropped()) {
                        unackedListIterator.remove();
                    }
                }
                dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty());
                if (sub instanceof QueueBrowserSubscription) {
                    ((QueueBrowserSubscription)sub).decrementQueueRef();
                    browserSubscriptions.remove(sub);
                }
                // AMQ-5107: don't resend if the broker is shutting down
                if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) {
                    doDispatch(new OrderedPendingList());
                }
            } finally {
                consumersLock.writeLock().unlock();
            }
            if (!this.optimizedDispatch) {
                wakeup();
            }
        } finally {
            pagedInPendingDispatchLock.writeLock().unlock();
        }
        if (this.optimizedDispatch) {
            // Outside of dispatchLock() to maintain the lock hierarchy of
            // iteratingMutex -> dispatchLock. - see
            // https://issues.apache.org/activemq/browse/AMQ-1878
            wakeup();
        }
    }