private ClientMessage receive()

in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java [201:371]


   private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws ActiveMQException {
      if (logger.isTraceEnabled()) {
         logger.trace("{}::receive({}, {})", this, timeout, forcingDelivery);
      }

      checkClosed();

      if (largeMessageReceived != null) {
         if (logger.isTraceEnabled()) {
            logger.trace("{}::receive({}, {}) -> discard LargeMessage body for {}", this, timeout, forcingDelivery, largeMessageReceived);
         }
         // Check if there are pending packets to be received
         largeMessageReceived.discardBody();
         largeMessageReceived = null;
      }

      if (rateLimiter != null) {
         rateLimiter.limit();
      }

      if (handler != null) {
         if (logger.isTraceEnabled()) {
            logger.trace("{}::receive({}, {}) -> throwing messageHandlerSet", this, timeout, forcingDelivery);
         }
         throw ActiveMQClientMessageBundle.BUNDLE.messageHandlerSet();
      }

      if (clientWindowSize == 0) {
         if (logger.isTraceEnabled()) {
            logger.trace("{}::receive({}, {}) -> start slowConsumer", this, timeout, forcingDelivery);
         }
         startSlowConsumer();
      }

      receiverThread = Thread.currentThread();

      // To verify if deliveryForced was already call
      boolean deliveryForced = false;
      // To control when to call deliveryForce
      boolean callForceDelivery = false;

      long start = -1;

      long toWait = timeout == 0 ? Long.MAX_VALUE : timeout;

      try {
         while (true) {
            ClientMessageInternal m = null;

            synchronized (this) {
               while ((stopped || (m = buffer.poll()) == null) && !closed && toWait > 0) {
                  if (start == -1) {
                     start = System.currentTimeMillis();
                  }

                  if (m == null && forcingDelivery) {
                     if (stopped) {
                        break;
                     }

                     // we only force delivery once per call to receive
                     if (!deliveryForced) {
                        callForceDelivery = true;
                        break;
                     }
                  }

                  try {
                     wait(toWait);
                  } catch (InterruptedException e) {
                     throw new ActiveMQInterruptedException(e);
                  }

                  if (m != null || closed) {
                     break;
                  }

                  long now = System.currentTimeMillis();

                  toWait -= now - start;

                  start = now;
               }
            }

            if (failedOver) {
               if (m == null) {
                  if (logger.isTraceEnabled()) {
                     logger.trace("{}::receive({}, {}) -> m == null and failover", this, timeout, forcingDelivery);
                  }

                  // if failed over and the buffer is null, we reset the state and try it again
                  failedOver = false;
                  deliveryForced = false;
                  toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
                  continue;
               } else {
                  if (logger.isTraceEnabled()) {
                     logger.trace("{}::receive({}, {}) -> failedOver, but m != null, being {}", this, timeout, forcingDelivery, m);
                  }
                  failedOver = false;
               }
            }

            if (callForceDelivery) {
               logger.trace("{}::Forcing delivery", this);
               // Calling forceDelivery outside of the lock to avoid distributed dead locks
               sessionContext.forceDelivery(this, forceDeliveryCount.getAndIncrement());
               callForceDelivery = false;
               deliveryForced = true;
               continue;
            }

            if (m != null) {
               session.workDone();

               if (m.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
                  long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);

                  // Need to check if forceDelivery was called at this call
                  // As we could be receiving a message that came from a previous call
                  if (forcingDelivery && deliveryForced && seq == forceDeliveryCount.get() - 1) {
                     // forced delivery messages are discarded, nothing has been delivered by the queue
                     resetIfSlowConsumer();
                     logger.trace("{}::There was nothing on the queue, leaving it now:: returning null", this);

                     return null;
                  } else {
                     logger.trace("{}::Ignored force delivery answer as it belonged to another call", this);
                     // Ignore the message
                     continue;
                  }
               }
               // if we have already pre acked we can't expire
               boolean expired = m.isExpired();

               flowControlBeforeConsumption(m);

               if (expired) {
                  m.discardBody();

                  session.expire(this, m);

                  if (clientWindowSize == 0) {
                     startSlowConsumer();
                  }

                  if (toWait > 0) {
                     continue;
                  } else {
                     return null;
                  }
               }

               if (m.isLargeMessage()) {
                  largeMessageReceived = m;
               }

               logger.trace("{}::Returning {}", this, m);

               return m;
            } else {
               logger.trace("{}::Returning null", this);
               resetIfSlowConsumer();
               return null;
            }
         }
      } finally {
         receiverThread = null;
      }
   }