in activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java [103:208]
public void add(MessageReference node) throws Exception {
if (isDuplicate(node)) {
return;
}
// Lets use an indirect reference so that we can associate a unique
// locator /w the message.
node = new IndirectMessageReference(node.getMessage());
getSubscriptionStatistics().getEnqueues().increment();
synchronized (matchedListMutex) {
// if this subscriber is already discarding a message, we don't want to add
// any more messages to it as those messages can only be advisories generated in the process,
// which can trigger the recursive call loop
if (discarding) return;
if (!isFull() && matched.isEmpty()) {
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
dispatch(node);
setSlowConsumer(false);
} else {
if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
// Slow consumers should log and set their state as such.
if (!isSlowConsumer()) {
String remoteAddr = null;
if (context != null && context.getConnection() != null) {
remoteAddr = context.getConnection().getRemoteAddress();
}
LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow{}", toString(), (remoteAddr != null) ? ": " + remoteAddr : "");
setSlowConsumer(true);
for (Destination dest: destinations) {
dest.slowConsumer(getContext(), this);
}
}
}
if (maximumPendingMessages != 0) {
boolean warnedAboutWait = false;
while (active) {
while (matched.isFull()) {
if (getContext().getStopping().get()) {
LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
getSubscriptionStatistics().getEnqueues().decrement();
return;
}
if (!warnedAboutWait) {
LOG.info("{}: Pending message cursor [{}] is full, temp usage ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
toString(),
matched,
matched.getSystemUsage().getTempUsage().getPercentUsage(),
matched.getSystemUsage().getMemoryUsage().getPercentUsage());
warnedAboutWait = true;
}
matchedListMutex.wait(20);
}
// Temporary storage could be full - so just try to add the message
// see https://issues.apache.org/activemq/browse/AMQ-2475
if (matched.tryAddMessageLast(node, 10)) {
break;
}
}
if (maximumPendingMessages > 0) {
// calculate the high water mark from which point we
// will eagerly evict expired messages
int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
max = maximumPendingMessages;
}
if (!matched.isEmpty() && matched.size() > max) {
removeExpiredMessages();
}
// lets discard old messages as we are a slow consumer
while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
int pageInSize = matched.size() - maximumPendingMessages;
// only page in a 1000 at a time - else we could blow the memory
pageInSize = Math.max(1000, pageInSize);
LinkedList<MessageReference> list = null;
MessageReference[] oldMessages=null;
synchronized(matched){
list = matched.pageInList(pageInSize);
oldMessages = messageEvictionStrategy.evictMessages(list);
for (MessageReference ref : list) {
ref.decrementReferenceCount();
}
}
int messagesToEvict = 0;
if (oldMessages != null){
messagesToEvict = oldMessages.length;
for (int i = 0; i < messagesToEvict; i++) {
MessageReference oldMessage = oldMessages[i];
//Expired here is false as we are discarding due to the messageEvictingStrategy
discard(oldMessage, false);
}
}
// lets avoid an infinite loop if we are given a bad eviction strategy
// for a bad strategy lets just not evict
if (messagesToEvict == 0) {
LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates",
destination, messageEvictionStrategy, list.size());
break;
}
}
}
dispatchMatched();
}
}
}
}