private boolean deliverNextPending()

in qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java [764:837]


    private boolean deliverNextPending() {
        if (session.isStarted() && messageQueue.isRunning() && messageListener != null) {
            dispatchLock.lock();
            try {
                JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
                if (envelope == null) {
                    return false;
                }

                TraceableMessage facade = envelope.getMessage().getFacade();

                if (consumeExpiredMessage(envelope)) {
                    LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
                    doAckExpired(envelope);
                    tracer.asyncDeliveryInit(facade, address);
                    tracer.asyncDeliveryComplete(facade, DeliveryOutcome.EXPIRED, null);
                } else if (session.redeliveryExceeded(envelope)) {
                    LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
                    applyRedeliveryPolicyOutcome(envelope);
                    tracer.asyncDeliveryInit(facade, address);
                    tracer.asyncDeliveryComplete(facade, DeliveryOutcome.REDELIVERIES_EXCEEDED, null);
                } else {
                    final JmsMessage copy;

                    boolean deliveryFailed = false;
                    boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
                                              acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
                    if (autoAckOrDupsOk) {
                        copy = copy(doAckDelivered(envelope));
                    } else {
                        copy = copy(ackFromReceive(envelope));
                    }
                    session.clearSessionRecovered();

                    try {
                        tracer.asyncDeliveryInit(facade, address);

                        messageListener.onMessage(copy);
                    } catch (RuntimeException rte) {
                        deliveryFailed = true;
                        tracer.asyncDeliveryComplete(facade, DeliveryOutcome.APPLICATION_ERROR, rte);
                    } finally {
                        if (!deliveryFailed) {
                            tracer.asyncDeliveryComplete(facade, DeliveryOutcome.DELIVERED, null);
                        }
                    }

                    if (autoAckOrDupsOk && !session.isSessionRecovered()) {
                        if (!deliveryFailed) {
                            doAckConsumed(envelope);
                        } else {
                            doAckReleased(envelope);
                        }
                    }
                }
            } catch (Exception e) {
                // An error while attempting to copy the message is the likely cause of this
                // exception case being hit.
                signalExceptionListener(e);
            } finally {
                dispatchLock.unlock();

                if (isPullConsumer()) {
                    try {
                        startConsumerResource();
                    } catch (JMSException e) {
                        LOG.error("Exception during credit replenishment for consumer listener {}", getConsumerId(), e);
                    }
                }
            }
        }

        return !messageQueue.isEmpty();
    }