public boolean iterate()

in activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java [1672:1794]


    public boolean iterate() {
        MDC.put("activemq.destination", getName());
        boolean pageInMoreMessages = false;
        synchronized (iteratingMutex) {

            // If optimize dispatch is on or this is a slave this method could be called recursively
            // we set this state value to short-circuit wakeup in those cases to avoid that as it
            // could lead to errors.
            iterationRunning = true;

            // do early to allow dispatch of these waiting messages
            synchronized (messagesWaitingForSpace) {
                Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
                while (it.hasNext()) {
                    if (!memoryUsage.isFull()) {
                        Runnable op = it.next();
                        it.remove();
                        op.run();
                    } else {
                        registerCallbackForNotFullNotification();
                        break;
                    }
                }
            }

            if (firstConsumer) {
                firstConsumer = false;
                try {
                    if (consumersBeforeDispatchStarts > 0) {
                        int timeout = 1000; // wait one second by default if
                                            // consumer count isn't reached
                        if (timeBeforeDispatchStarts > 0) {
                            timeout = timeBeforeDispatchStarts;
                        }
                        if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
                            LOG.debug("{} consumers subscribed. Starting dispatch.", consumers.size());
                        } else {
                            LOG.debug("{} ms elapsed and {} consumers subscribed. Starting dispatch.", timeout, consumers.size());
                        }
                    }
                    if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
                        iteratingMutex.wait(timeBeforeDispatchStarts);
                        LOG.debug("{} ms elapsed. Starting dispatch.", timeBeforeDispatchStarts);
                    }
                } catch (Exception e) {
                    LOG.error(e.toString());
                }
            }

            messagesLock.readLock().lock();
            try{
                pageInMoreMessages |= !messages.isEmpty();
            } finally {
                messagesLock.readLock().unlock();
            }

            pagedInPendingDispatchLock.readLock().lock();
            try {
                pageInMoreMessages |= !dispatchPendingList.isEmpty();
            } finally {
                pagedInPendingDispatchLock.readLock().unlock();
            }

            boolean hasBrowsers = !browserSubscriptions.isEmpty();

            if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
                try {
                    pageInMessages(hasBrowsers && getMaxBrowsePageSize() > 0, getMaxPageSize());
                } catch (Throwable e) {
                    LOG.error("Failed to page in more queue messages ", e);
                }
            }

            if (hasBrowsers) {
                PendingList messagesInMemory = isPrioritizedMessages() ?
                        new PrioritizedPendingList() : new OrderedPendingList();
                pagedInMessagesLock.readLock().lock();
                try {
                    messagesInMemory.addAll(pagedInMessages);
                } finally {
                    pagedInMessagesLock.readLock().unlock();
                }

                Iterator<QueueBrowserSubscription> browsers = browserSubscriptions.iterator();
                while (browsers.hasNext()) {
                    QueueBrowserSubscription browser = browsers.next();
                    try {
                        MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
                        msgContext.setDestination(destination);

                        LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size());
                        boolean added = false;
                        for (MessageReference node : messagesInMemory) {
                            if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
                                msgContext.setMessageReference(node);
                                if (browser.matches(node, msgContext)) {
                                    browser.add(node);
                                    added = true;
                                }
                            }
                        }
                        // are we done browsing? no new messages paged
                        if (!added || browser.atMax()) {
                            browser.decrementQueueRef();
                            browsers.remove();
                        } else {
                            wakeup();
                        }
                    } catch (Exception e) {
                        LOG.warn("exception on dispatch to browser: {}", browser, e);
                    }
                }
            }

            if (pendingWakeups.get() > 0) {
                pendingWakeups.decrementAndGet();
            }
            MDC.remove("activemq.destination");
            iterationRunning = false;

            return pendingWakeups.get() > 0;
        }
    }