private PendingList doPageInForDispatch()

in activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java [2049:2163]


    private PendingList doPageInForDispatch(boolean force, boolean processExpired, int maxPageSize) throws Exception {
        List<QueueMessageReference> result = null;
        PendingList resultList = null;

        int toPageIn = maxPageSize;
        messagesLock.readLock().lock();
        try {
            toPageIn = Math.min(toPageIn, messages.size());
        } finally {
            messagesLock.readLock().unlock();
        }
        int pagedInPendingSize = 0;
        pagedInPendingDispatchLock.readLock().lock();
        try {
            pagedInPendingSize = dispatchPendingList.size();
        } finally {
            pagedInPendingDispatchLock.readLock().unlock();
        }
        if (isLazyDispatch() && !force) {
            // Only page in the minimum number of messages which can be
            // dispatched immediately.
            toPageIn = Math.min(toPageIn, getConsumerMessageCountBeforeFull());
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} toPageIn: {}, force:{}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}, maxPageSize:{}",
                    this,
                    toPageIn,
                    force,
                    destinationStatistics.getInflight().getCount(),
                    pagedInMessages.size(),
                    pagedInPendingSize,
                    destinationStatistics.getEnqueues().getCount(),
                    destinationStatistics.getDequeues().getCount(),
                    getMemoryUsage().getUsage(),
                    maxPageSize);
        }

        if (toPageIn > 0 && (force || (haveRealConsumer() && pagedInPendingSize < maxPageSize))) {
            int count = 0;
            result = new ArrayList<QueueMessageReference>(toPageIn);
            messagesLock.writeLock().lock();
            try {
                try {
                    messages.setMaxBatchSize(toPageIn);
                    messages.reset();
                    while (count < toPageIn && messages.hasNext()) {
                        MessageReference node = messages.next();
                        messages.remove();

                        QueueMessageReference ref = createMessageReference(node.getMessage());
                        if (processExpired && ref.isExpired()) {
                            if (broker.isExpired(ref)) {
                                messageExpired(createConnectionContext(), ref);
                            } else {
                                ref.decrementReferenceCount();
                            }
                        } else {
                            result.add(ref);
                            count++;
                        }
                    }
                } finally {
                    messages.release();
                }
            } finally {
                messagesLock.writeLock().unlock();
            }

            if (count > 0) {
                // Only add new messages, not already pagedIn to avoid multiple
                // dispatch attempts
                pagedInMessagesLock.writeLock().lock();
                try {
                    if (isPrioritizedMessages()) {
                        resultList = new PrioritizedPendingList();
                    } else {
                        resultList = new OrderedPendingList();
                    }
                    for (QueueMessageReference ref : result) {
                        if (!pagedInMessages.contains(ref)) {
                            pagedInMessages.addMessageLast(ref);
                            resultList.addMessageLast(ref);
                        } else {
                            ref.decrementReferenceCount();
                            // store should have trapped duplicate in it's index, or cursor audit trapped insert
                            // or producerBrokerExchange suppressed send.
                            // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id
                            LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
                            if (store != null) {
                                ConnectionContext connectionContext = createConnectionContext();
                                dropMessage(connectionContext, ref);
                                if (gotToTheStore(ref.getMessage())) {
                                    LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
                                    store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
                                }
                                broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination));
                            }
                        }
                    }
                } finally {
                    pagedInMessagesLock.writeLock().unlock();
                }
            } else if (!messages.hasSpace()) {
                if (isFlowControlLogRequired()) {
                    LOG.warn("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage());
                } else {
                    LOG.debug("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage());
                }
            }
        }

        // Avoid return null list, if condition is not validated
        return resultList != null ? resultList : new OrderedPendingList();
    }