private JmsInboundMessageDispatch dequeue()

in qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java [294:378]


    private JmsInboundMessageDispatch dequeue(long timeout, boolean localCheckOnly) throws JMSException {
        boolean pullConsumer = isPullConsumer();
        boolean pullForced = pullConsumer;

        try {
            long deadline = 0;
            if (timeout > 0) {
                deadline = System.currentTimeMillis() + timeout;
            }

            performPullIfRequired(timeout, false);

            while (true) {
                JmsInboundMessageDispatch envelope = null;
                if (pullForced || pullConsumer) {
                    // Any waiting was done by the pull request, try immediate retrieval from the queue.
                    envelope = messageQueue.dequeue(0);
                } else {
                    envelope = messageQueue.dequeue(timeout);
                }

                if (getFailureCause() != null) {
                    LOG.debug("{} receive failed: {}", getConsumerId(), getFailureCause().getMessage());
                    throw JmsExceptionSupport.create(getFailureCause());
                }

                if (envelope == null) {
                    if ((timeout == 0 && (pullForced || localCheckOnly)) || pullConsumer || messageQueue.isClosed()) {
                        return null;
                    } else if (timeout > 0) {
                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                    }

                    if (timeout >= 0 && !localCheckOnly) {
                        // We don't do this for receive with no timeout since it
                        // is redundant: zero-prefetch consumers already pull, and
                        // the rest block indefinitely on the local messageQueue.
                        pullForced = true;
                        if (performPullIfRequired(timeout, true)) {
                            startConsumerResource();
                            // We refresh credit if it is a prefetching consumer, since the
                            // pull drained it. Processing acks can open the credit window, but
                            // not in all cases, and if we didn't get a message it would stay
                            // closed until future pulls were performed.
                        }
                    }

                    continue;
                }

                TraceableMessage facade = envelope.getMessage().getFacade();

                if (consumeExpiredMessage(envelope)) {
                    LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
                    doAckExpired(envelope);
                    tracer.syncReceive(facade, address, DeliveryOutcome.EXPIRED);

                    if (timeout > 0) {
                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                    }
                    performPullIfRequired(timeout, false);
                } else if (session.redeliveryExceeded(envelope)) {
                    LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
                    applyRedeliveryPolicyOutcome(envelope);
                    tracer.syncReceive(facade, address, DeliveryOutcome.REDELIVERIES_EXCEEDED);

                    if (timeout > 0) {
                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                    }
                    performPullIfRequired(timeout, false);
                } else {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace(getConsumerId() + " received message: " + envelope);
                    }

                    tracer.syncReceive(facade, address, DeliveryOutcome.DELIVERED);

                    return envelope;
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw JmsExceptionSupport.create(e);
        }
    }