private boolean deliver()

in artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java [2881:3072]


   private boolean deliver() {
      if (logger.isTraceEnabled()) {
         logger.trace("Queue {} doing deliver. messageReferences={} with consumers={}", queueConfiguration.getName(), messageReferences.size(), getConsumerCount());
      }

      scheduledRunners.decrementAndGet();

      doInternalPoll();

      // Either the iterator is empty or the consumer is busy
      int noDelivery = 0;

      // track filters not matching, used to track when all consumers can't match, redistribution is then an option
      int numNoMatch = 0;
      int numAttempts = 0;

      int handled = 0;

      long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
      consumers.reset();
      while (true) {
         if (handled == MAX_DELIVERIES_IN_LOOP || System.nanoTime() - timeout > 0) {
            // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
            deliverAsync(true);
            return false;
         }

         MessageReference ref;
         Consumer handledconsumer = null;

         synchronized (QueueImpl.this) {

            if (queueDestroyed) {
               if (messageReferences.isEmpty()) {
                  return false;
               }
               try {
                  removeMessagesWhileDelivering();
               } catch (Exception e) {
                  logger.warn(e.getMessage(), e);
               }
               return false;
            }

            // Need to do these checks inside the synchronized
            if (isPaused() || !canDispatch()) {
               return false;
            }

            if (messageReferences.isEmpty()) {
               break;
            }

            final ConsumerHolder<? extends Consumer> holder;
            final LinkedListIterator<MessageReference> holderIterator;
            if (consumers.hasNext()) {
               holder = consumers.next();
               if (holder == null) {
                  // this shouldn't happen, however I'm adding this check just in case
                  logger.debug("consumers.next() returned null.");
                  deliverAsync(true);
                  return false;
               }
               if (holder.iter == null) {
                  holder.iter = messageReferences.iterator();
               }
               holderIterator = holder.iter;
            } else {
               pruneLastValues();
               break;
            }

            Consumer consumer = holder.consumer;
            Consumer groupConsumer = null;

            // we remove the consumerHolder when the Consumer is closed
            // however the QueueConsumerIterator may hold a reference until the reset is called, which
            // could happen a little later.
            if (consumer.isClosed()) {
               deliverAsync(true);
               return false;
            }

            if (holderIterator.hasNext()) {
               ref = holderIterator.next();
            } else {
               ref = null;
            }

            if (ref == null) {
               noDelivery++;
            } else {
               if (checkExpired(ref)) {
                  logger.trace("Reference {} being expired", ref);

                  removeMessageReference(holder, ref);
                  handled++;
                  consumers.reset();
                  continue;
               }

               logger.trace("Queue {} is delivering reference {}", queueConfiguration.getName(), ref);

               final SimpleString groupID = extractGroupID(ref);
               groupConsumer = getGroupConsumer(groupID);

               if (groupConsumer != null) {
                  consumer = groupConsumer;
               }

               numAttempts++;
               HandleStatus status = handle(ref, consumer);

               if (status == HandleStatus.HANDLED) {

                  // if a message was delivered, any previous negative attempts need to be cleared
                  // this is to avoid breaks on the loop when checking for any other factors.
                  noDelivery = 0;
                  numNoMatch = 0;
                  numAttempts = 0;

                  ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);

                  deliveriesInTransit.countUp();

                  if (!queueConfiguration.isNonDestructive()) {
                     removeMessageReference(holder, ref);
                  }
                  ref.setInDelivery(true);
                  handledconsumer = consumer;
                  handled++;
                  consumers.reset();
               } else if (status == HandleStatus.BUSY) {
                  try {
                     holderIterator.repeat();
                  } catch (NoSuchElementException e) {
                     // this could happen if there was an exception on the queue handling
                     // and it returned BUSY because of that exception
                     //
                     // We will just log it as there's nothing else we can do now.
                     logger.warn(e.getMessage(), e);
                  }

                  noDelivery++;
                  numNoMatch = 0;
                  numAttempts = 0;
                  // no consumers.reset() b/c we skip this consumer
               } else if (status == HandleStatus.NO_MATCH) {
                  consumers.reset();
                  numNoMatch++;
                  // every attempt resulted in noMatch for number of consumers means we tried all consumers for a single message
                  if (numNoMatch == numAttempts && numAttempts == consumers.size() && redistributor == null) {
                     hasUnMatchedPending = true;
                     // one hit of unmatched message is enough, no need to reset counters
                  }
               }
            }

            if (groupConsumer != null) {
               if (noDelivery > 0) {
                  pruneLastValues();
                  break;
               }
               noDelivery = 0;
            } else if (!consumers.hasNext()) {
               // Round robin'd all

               if (noDelivery == this.consumers.size()) {
                  pruneLastValues();

                  if (handledconsumer != null) {
                     // this shouldn't really happen,
                     // however I'm keeping this as an assertion case future developers ever change the logic here on this class
                     ActiveMQServerLogger.LOGGER.nonDeliveryHandled();
                  } else {
                     logger.debug("{}::All the consumers were busy, giving up now", this);
                     break;
                  }
               }

               noDelivery = 0;
            }

         }

         if (handledconsumer != null) {
            proceedDeliver(handledconsumer, ref);
         }
      }

      return true;
   }