in activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java [1672:1794]
public boolean iterate() {
MDC.put("activemq.destination", getName());
boolean pageInMoreMessages = false;
synchronized (iteratingMutex) {
// If optimize dispatch is on or this is a slave this method could be called recursively
// we set this state value to short-circuit wakeup in those cases to avoid that as it
// could lead to errors.
iterationRunning = true;
// do early to allow dispatch of these waiting messages
synchronized (messagesWaitingForSpace) {
Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
while (it.hasNext()) {
if (!memoryUsage.isFull()) {
Runnable op = it.next();
it.remove();
op.run();
} else {
registerCallbackForNotFullNotification();
break;
}
}
}
if (firstConsumer) {
firstConsumer = false;
try {
if (consumersBeforeDispatchStarts > 0) {
int timeout = 1000; // wait one second by default if
// consumer count isn't reached
if (timeBeforeDispatchStarts > 0) {
timeout = timeBeforeDispatchStarts;
}
if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
LOG.debug("{} consumers subscribed. Starting dispatch.", consumers.size());
} else {
LOG.debug("{} ms elapsed and {} consumers subscribed. Starting dispatch.", timeout, consumers.size());
}
}
if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
iteratingMutex.wait(timeBeforeDispatchStarts);
LOG.debug("{} ms elapsed. Starting dispatch.", timeBeforeDispatchStarts);
}
} catch (Exception e) {
LOG.error(e.toString());
}
}
messagesLock.readLock().lock();
try{
pageInMoreMessages |= !messages.isEmpty();
} finally {
messagesLock.readLock().unlock();
}
pagedInPendingDispatchLock.readLock().lock();
try {
pageInMoreMessages |= !dispatchPendingList.isEmpty();
} finally {
pagedInPendingDispatchLock.readLock().unlock();
}
boolean hasBrowsers = !browserSubscriptions.isEmpty();
if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
try {
pageInMessages(hasBrowsers && getMaxBrowsePageSize() > 0, getMaxPageSize());
} catch (Throwable e) {
LOG.error("Failed to page in more queue messages ", e);
}
}
if (hasBrowsers) {
PendingList messagesInMemory = isPrioritizedMessages() ?
new PrioritizedPendingList() : new OrderedPendingList();
pagedInMessagesLock.readLock().lock();
try {
messagesInMemory.addAll(pagedInMessages);
} finally {
pagedInMessagesLock.readLock().unlock();
}
Iterator<QueueBrowserSubscription> browsers = browserSubscriptions.iterator();
while (browsers.hasNext()) {
QueueBrowserSubscription browser = browsers.next();
try {
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size());
boolean added = false;
for (MessageReference node : messagesInMemory) {
if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
msgContext.setMessageReference(node);
if (browser.matches(node, msgContext)) {
browser.add(node);
added = true;
}
}
}
// are we done browsing? no new messages paged
if (!added || browser.atMax()) {
browser.decrementQueueRef();
browsers.remove();
} else {
wakeup();
}
} catch (Exception e) {
LOG.warn("exception on dispatch to browser: {}", browser, e);
}
}
}
if (pendingWakeups.get() > 0) {
pendingWakeups.decrementAndGet();
}
MDC.remove("activemq.destination");
iterationRunning = false;
return pendingWakeups.get() > 0;
}
}