in activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java [2049:2163]
private PendingList doPageInForDispatch(boolean force, boolean processExpired, int maxPageSize) throws Exception {
List<QueueMessageReference> result = null;
PendingList resultList = null;
int toPageIn = maxPageSize;
messagesLock.readLock().lock();
try {
toPageIn = Math.min(toPageIn, messages.size());
} finally {
messagesLock.readLock().unlock();
}
int pagedInPendingSize = 0;
pagedInPendingDispatchLock.readLock().lock();
try {
pagedInPendingSize = dispatchPendingList.size();
} finally {
pagedInPendingDispatchLock.readLock().unlock();
}
if (isLazyDispatch() && !force) {
// Only page in the minimum number of messages which can be
// dispatched immediately.
toPageIn = Math.min(toPageIn, getConsumerMessageCountBeforeFull());
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} toPageIn: {}, force:{}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}, maxPageSize:{}",
this,
toPageIn,
force,
destinationStatistics.getInflight().getCount(),
pagedInMessages.size(),
pagedInPendingSize,
destinationStatistics.getEnqueues().getCount(),
destinationStatistics.getDequeues().getCount(),
getMemoryUsage().getUsage(),
maxPageSize);
}
if (toPageIn > 0 && (force || (haveRealConsumer() && pagedInPendingSize < maxPageSize))) {
int count = 0;
result = new ArrayList<QueueMessageReference>(toPageIn);
messagesLock.writeLock().lock();
try {
try {
messages.setMaxBatchSize(toPageIn);
messages.reset();
while (count < toPageIn && messages.hasNext()) {
MessageReference node = messages.next();
messages.remove();
QueueMessageReference ref = createMessageReference(node.getMessage());
if (processExpired && ref.isExpired()) {
if (broker.isExpired(ref)) {
messageExpired(createConnectionContext(), ref);
} else {
ref.decrementReferenceCount();
}
} else {
result.add(ref);
count++;
}
}
} finally {
messages.release();
}
} finally {
messagesLock.writeLock().unlock();
}
if (count > 0) {
// Only add new messages, not already pagedIn to avoid multiple
// dispatch attempts
pagedInMessagesLock.writeLock().lock();
try {
if (isPrioritizedMessages()) {
resultList = new PrioritizedPendingList();
} else {
resultList = new OrderedPendingList();
}
for (QueueMessageReference ref : result) {
if (!pagedInMessages.contains(ref)) {
pagedInMessages.addMessageLast(ref);
resultList.addMessageLast(ref);
} else {
ref.decrementReferenceCount();
// store should have trapped duplicate in it's index, or cursor audit trapped insert
// or producerBrokerExchange suppressed send.
// note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id
LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
if (store != null) {
ConnectionContext connectionContext = createConnectionContext();
dropMessage(connectionContext, ref);
if (gotToTheStore(ref.getMessage())) {
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
}
broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination));
}
}
}
} finally {
pagedInMessagesLock.writeLock().unlock();
}
} else if (!messages.hasSpace()) {
if (isFlowControlLogRequired()) {
LOG.warn("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage());
} else {
LOG.debug("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage());
}
}
}
// Avoid return null list, if condition is not validated
return resultList != null ? resultList : new OrderedPendingList();
}