in qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java [764:837]
private boolean deliverNextPending() {
if (session.isStarted() && messageQueue.isRunning() && messageListener != null) {
dispatchLock.lock();
try {
JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
if (envelope == null) {
return false;
}
TraceableMessage facade = envelope.getMessage().getFacade();
if (consumeExpiredMessage(envelope)) {
LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
doAckExpired(envelope);
tracer.asyncDeliveryInit(facade, address);
tracer.asyncDeliveryComplete(facade, DeliveryOutcome.EXPIRED, null);
} else if (session.redeliveryExceeded(envelope)) {
LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
applyRedeliveryPolicyOutcome(envelope);
tracer.asyncDeliveryInit(facade, address);
tracer.asyncDeliveryComplete(facade, DeliveryOutcome.REDELIVERIES_EXCEEDED, null);
} else {
final JmsMessage copy;
boolean deliveryFailed = false;
boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
if (autoAckOrDupsOk) {
copy = copy(doAckDelivered(envelope));
} else {
copy = copy(ackFromReceive(envelope));
}
session.clearSessionRecovered();
try {
tracer.asyncDeliveryInit(facade, address);
messageListener.onMessage(copy);
} catch (RuntimeException rte) {
deliveryFailed = true;
tracer.asyncDeliveryComplete(facade, DeliveryOutcome.APPLICATION_ERROR, rte);
} finally {
if (!deliveryFailed) {
tracer.asyncDeliveryComplete(facade, DeliveryOutcome.DELIVERED, null);
}
}
if (autoAckOrDupsOk && !session.isSessionRecovered()) {
if (!deliveryFailed) {
doAckConsumed(envelope);
} else {
doAckReleased(envelope);
}
}
}
} catch (Exception e) {
// An error while attempting to copy the message is the likely cause of this
// exception case being hit.
signalExceptionListener(e);
} finally {
dispatchLock.unlock();
if (isPullConsumer()) {
try {
startConsumerResource();
} catch (JMSException e) {
LOG.error("Exception during credit replenishment for consumer listener {}", getConsumerId(), e);
}
}
}
}
return !messageQueue.isEmpty();
}