public void add()

in activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java [103:208]


    public void add(MessageReference node) throws Exception {
        if (isDuplicate(node)) {
            return;
        }
        // Lets use an indirect reference so that we can associate a unique
        // locator /w the message.
        node = new IndirectMessageReference(node.getMessage());
        getSubscriptionStatistics().getEnqueues().increment();
        synchronized (matchedListMutex) {
            // if this subscriber is already discarding a message, we don't want to add
            // any more messages to it as those messages can only be advisories generated in the process,
            // which can trigger the recursive call loop
            if (discarding) return;

            if (!isFull() && matched.isEmpty()) {
                // if maximumPendingMessages is set we will only discard messages which
                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
                dispatch(node);
                setSlowConsumer(false);
            } else {
                if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
                    // Slow consumers should log and set their state as such.
                    if (!isSlowConsumer()) {
                        String remoteAddr = null;
                        if (context != null && context.getConnection() != null) {
                            remoteAddr = context.getConnection().getRemoteAddress();
                        }
                        LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow{}", toString(), (remoteAddr != null) ? ": " + remoteAddr : "");
                        setSlowConsumer(true);
                        for (Destination dest: destinations) {
                            dest.slowConsumer(getContext(), this);
                        }
                    }
                }
                if (maximumPendingMessages != 0) {
                    boolean warnedAboutWait = false;
                    while (active) {
                        while (matched.isFull()) {
                            if (getContext().getStopping().get()) {
                                LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
                                getSubscriptionStatistics().getEnqueues().decrement();
                                return;
                            }
                            if (!warnedAboutWait) {
                                LOG.info("{}: Pending message cursor [{}] is full, temp usage ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
                                        toString(),
                                        matched,
                                        matched.getSystemUsage().getTempUsage().getPercentUsage(),
                                        matched.getSystemUsage().getMemoryUsage().getPercentUsage());
                                warnedAboutWait = true;
                            }
                            matchedListMutex.wait(20);
                        }
                        // Temporary storage could be full - so just try to add the message
                        // see https://issues.apache.org/activemq/browse/AMQ-2475
                        if (matched.tryAddMessageLast(node, 10)) {
                            break;
                        }
                    }
                    if (maximumPendingMessages > 0) {
                        // calculate the high water mark from which point we
                        // will eagerly evict expired messages
                        int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
                        if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
                            max = maximumPendingMessages;
                        }
                        if (!matched.isEmpty() && matched.size() > max) {
                            removeExpiredMessages();
                        }
                        // lets discard old messages as we are a slow consumer
                        while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
                            int pageInSize = matched.size() - maximumPendingMessages;
                            // only page in a 1000 at a time - else we could blow the memory
                            pageInSize = Math.max(1000, pageInSize);
                            LinkedList<MessageReference> list = null;
                            MessageReference[] oldMessages=null;
                            synchronized(matched){
                                list = matched.pageInList(pageInSize);
                                oldMessages = messageEvictionStrategy.evictMessages(list);
                                for (MessageReference ref : list) {
                                    ref.decrementReferenceCount();
                                }
                            }
                            int messagesToEvict = 0;
                            if (oldMessages != null){
                                messagesToEvict = oldMessages.length;
                                for (int i = 0; i < messagesToEvict; i++) {
                                    MessageReference oldMessage = oldMessages[i];
                                    //Expired here is false as we are discarding due to the messageEvictingStrategy
                                    discard(oldMessage, false);
                                }
                            }
                            // lets avoid an infinite loop if we are given a bad eviction strategy
                            // for a bad strategy lets just not evict
                            if (messagesToEvict == 0) {
                                LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates",
                                        destination, messageEvictionStrategy, list.size());
                                break;
                            }
                        }
                    }
                    dispatchMatched();
                }
            }
        }
    }