in activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java [497:613]
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
throws Exception {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
pagedInPendingDispatchLock.writeLock().lock();
try {
LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{
getActiveMQDestination().getQualifiedName(),
sub,
lastDeliveredSequenceId,
getDestinationStatistics().getDequeues().getCount(),
getDestinationStatistics().getDispatched().getCount(),
getDestinationStatistics().getInflight().getCount(),
sub.getConsumerInfo().getAssignedGroupCount(destination)
});
consumersLock.writeLock().lock();
try {
removeFromConsumerList(sub);
if (sub.getConsumerInfo().isExclusive()) {
Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
if (exclusiveConsumer == sub) {
exclusiveConsumer = null;
for (Subscription s : consumers) {
if (s.getConsumerInfo().isExclusive()
&& (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
.getConsumerInfo().getPriority())) {
exclusiveConsumer = s;
}
}
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
} else if (isAllConsumersExclusiveByDefault()) {
Subscription exclusiveConsumer = null;
for (Subscription s : consumers) {
if (exclusiveConsumer == null
|| s.getConsumerInfo().getPriority() > exclusiveConsumer
.getConsumerInfo().getPriority()) {
exclusiveConsumer = s;
}
}
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
getMessageGroupOwners().removeConsumer(consumerId);
// redeliver inflight messages
boolean markAsRedelivered = false;
MessageReference lastDeliveredRef = null;
List<MessageReference> unAckedMessages = sub.remove(context, this);
// locate last redelivered in unconsumed list (list in delivery rather than seq order)
if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
for (MessageReference ref : unAckedMessages) {
if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
lastDeliveredRef = ref;
markAsRedelivered = true;
LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
break;
}
}
}
for (Iterator<MessageReference> unackedListIterator = unAckedMessages.iterator(); unackedListIterator.hasNext(); ) {
MessageReference ref = unackedListIterator.next();
// AMQ-5107: don't resend if the broker is shutting down
if ( this.brokerService.isStopping() ) {
break;
}
QueueMessageReference qmr = (QueueMessageReference) ref;
if (qmr.getLockOwner() == sub) {
qmr.unlock();
// have no delivery information
if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
qmr.incrementRedeliveryCounter();
} else {
if (markAsRedelivered) {
qmr.incrementRedeliveryCounter();
}
if (ref == lastDeliveredRef) {
// all that follow were not redelivered
markAsRedelivered = false;
}
}
}
if (qmr.isDropped()) {
unackedListIterator.remove();
}
}
dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty());
if (sub instanceof QueueBrowserSubscription) {
((QueueBrowserSubscription)sub).decrementQueueRef();
browserSubscriptions.remove(sub);
}
// AMQ-5107: don't resend if the broker is shutting down
if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) {
doDispatch(new OrderedPendingList());
}
} finally {
consumersLock.writeLock().unlock();
}
if (!this.optimizedDispatch) {
wakeup();
}
} finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
if (this.optimizedDispatch) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see
// https://issues.apache.org/activemq/browse/AMQ-1878
wakeup();
}
}