in qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java [294:378]
private JmsInboundMessageDispatch dequeue(long timeout, boolean localCheckOnly) throws JMSException {
boolean pullConsumer = isPullConsumer();
boolean pullForced = pullConsumer;
try {
long deadline = 0;
if (timeout > 0) {
deadline = System.currentTimeMillis() + timeout;
}
performPullIfRequired(timeout, false);
while (true) {
JmsInboundMessageDispatch envelope = null;
if (pullForced || pullConsumer) {
// Any waiting was done by the pull request, try immediate retrieval from the queue.
envelope = messageQueue.dequeue(0);
} else {
envelope = messageQueue.dequeue(timeout);
}
if (getFailureCause() != null) {
LOG.debug("{} receive failed: {}", getConsumerId(), getFailureCause().getMessage());
throw JmsExceptionSupport.create(getFailureCause());
}
if (envelope == null) {
if ((timeout == 0 && (pullForced || localCheckOnly)) || pullConsumer || messageQueue.isClosed()) {
return null;
} else if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
if (timeout >= 0 && !localCheckOnly) {
// We don't do this for receive with no timeout since it
// is redundant: zero-prefetch consumers already pull, and
// the rest block indefinitely on the local messageQueue.
pullForced = true;
if (performPullIfRequired(timeout, true)) {
startConsumerResource();
// We refresh credit if it is a prefetching consumer, since the
// pull drained it. Processing acks can open the credit window, but
// not in all cases, and if we didn't get a message it would stay
// closed until future pulls were performed.
}
}
continue;
}
TraceableMessage facade = envelope.getMessage().getFacade();
if (consumeExpiredMessage(envelope)) {
LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
doAckExpired(envelope);
tracer.syncReceive(facade, address, DeliveryOutcome.EXPIRED);
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
performPullIfRequired(timeout, false);
} else if (session.redeliveryExceeded(envelope)) {
LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
applyRedeliveryPolicyOutcome(envelope);
tracer.syncReceive(facade, address, DeliveryOutcome.REDELIVERIES_EXCEEDED);
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
performPullIfRequired(timeout, false);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(getConsumerId() + " received message: " + envelope);
}
tracer.syncReceive(facade, address, DeliveryOutcome.DELIVERED);
return envelope;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JmsExceptionSupport.create(e);
}
}