in artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java [3073:3265]
private boolean deliver() {
if (logger.isTraceEnabled()) {
logger.trace("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount());
}
scheduledRunners.decrementAndGet();
doInternalPoll();
// Either the iterator is empty or the consumer is busy
int noDelivery = 0;
// track filters not matching, used to track when all consumers can't match, redistribution is then an option
int numNoMatch = 0;
int numAttempts = 0;
int handled = 0;
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
consumers.reset();
while (true) {
if (handled == MAX_DELIVERIES_IN_LOOP || System.nanoTime() - timeout > 0) {
// Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
deliverAsync(true);
return false;
}
MessageReference ref;
Consumer handledconsumer = null;
synchronized (QueueImpl.this) {
if (queueDestroyed) {
if (messageReferences.size() == 0) {
return false;
}
try {
removeMessagesWhileDelivering();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
return false;
}
// Need to do these checks inside the synchronized
if (isPaused() || !canDispatch()) {
return false;
}
if (messageReferences.size() == 0) {
break;
}
final ConsumerHolder<? extends Consumer> holder;
final LinkedListIterator<MessageReference> holderIterator;
if (consumers.hasNext()) {
holder = consumers.next();
if (holder == null) {
// this shouldn't happen, however I'm adding this check just in case
logger.debug("consumers.next() returned null.");
consumers.remove();
deliverAsync(true);
return false;
}
if (holder.iter == null) {
holder.iter = messageReferences.iterator();
}
holderIterator = holder.iter;
} else {
pruneLastValues();
break;
}
Consumer consumer = holder.consumer;
Consumer groupConsumer = null;
// we remove the consumerHolder when the Consumer is closed
// however the QueueConsumerIterator may hold a reference until the reset is called, which
// could happen a little later.
if (consumer.isClosed()) {
deliverAsync(true);
return false;
}
if (holderIterator.hasNext()) {
ref = holderIterator.next();
} else {
ref = null;
}
if (ref == null) {
noDelivery++;
} else {
if (checkExpired(ref)) {
logger.trace("Reference {} being expired", ref);
removeMessageReference(holder, ref);
handled++;
consumers.reset();
continue;
}
logger.trace("Queue {} is delivering reference {}", name, ref);
final SimpleString groupID = extractGroupID(ref);
groupConsumer = getGroupConsumer(groupID);
if (groupConsumer != null) {
consumer = groupConsumer;
}
numAttempts++;
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
// if a message was delivered, any previous negative attempts need to be cleared
// this is to avoid breaks on the loop when checking for any other factors.
noDelivery = 0;
numNoMatch = 0;
numAttempts = 0;
ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
deliveriesInTransit.countUp();
if (!nonDestructive) {
removeMessageReference(holder, ref);
}
ref.setInDelivery(true);
handledconsumer = consumer;
handled++;
consumers.reset();
} else if (status == HandleStatus.BUSY) {
try {
holderIterator.repeat();
} catch (NoSuchElementException e) {
// this could happen if there was an exception on the queue handling
// and it returned BUSY because of that exception
//
// We will just log it as there's nothing else we can do now.
logger.warn(e.getMessage(), e);
}
noDelivery++;
numNoMatch = 0;
numAttempts = 0;
// no consumers.reset() b/c we skip this consumer
} else if (status == HandleStatus.NO_MATCH) {
consumers.reset();
numNoMatch++;
// every attempt resulted in noMatch for number of consumers means we tried all consumers for a single message
if (numNoMatch == numAttempts && numAttempts == consumers.size() && redistributor == null) {
hasUnMatchedPending = true;
// one hit of unmatched message is enough, no need to reset counters
}
}
}
if (groupConsumer != null) {
if (noDelivery > 0) {
pruneLastValues();
break;
}
noDelivery = 0;
} else if (!consumers.hasNext()) {
// Round robin'd all
if (noDelivery == this.consumers.size()) {
pruneLastValues();
if (handledconsumer != null) {
// this shouldn't really happen,
// however I'm keeping this as an assertion case future developers ever change the logic here on this class
ActiveMQServerLogger.LOGGER.nonDeliveryHandled();
} else {
logger.debug("{}::All the consumers were busy, giving up now", this);
break;
}
}
noDelivery = 0;
}
}
if (handledconsumer != null) {
proceedDeliver(handledconsumer, ref);
}
}
return true;
}