in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java [201:371]
private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws ActiveMQException {
if (logger.isTraceEnabled()) {
logger.trace("{}::receive({}, {})", this, timeout, forcingDelivery);
}
checkClosed();
if (largeMessageReceived != null) {
if (logger.isTraceEnabled()) {
logger.trace("{}::receive({}, {}) -> discard LargeMessage body for {}", this, timeout, forcingDelivery, largeMessageReceived);
}
// Check if there are pending packets to be received
largeMessageReceived.discardBody();
largeMessageReceived = null;
}
if (rateLimiter != null) {
rateLimiter.limit();
}
if (handler != null) {
if (logger.isTraceEnabled()) {
logger.trace("{}::receive({}, {}) -> throwing messageHandlerSet", this, timeout, forcingDelivery);
}
throw ActiveMQClientMessageBundle.BUNDLE.messageHandlerSet();
}
if (clientWindowSize == 0) {
if (logger.isTraceEnabled()) {
logger.trace("{}::receive({}, {}) -> start slowConsumer", this, timeout, forcingDelivery);
}
startSlowConsumer();
}
receiverThread = Thread.currentThread();
// To verify if deliveryForced was already call
boolean deliveryForced = false;
// To control when to call deliveryForce
boolean callForceDelivery = false;
long start = -1;
long toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
try {
while (true) {
ClientMessageInternal m = null;
synchronized (this) {
while ((stopped || (m = buffer.poll()) == null) && !closed && toWait > 0) {
if (start == -1) {
start = System.currentTimeMillis();
}
if (m == null && forcingDelivery) {
if (stopped) {
break;
}
// we only force delivery once per call to receive
if (!deliveryForced) {
callForceDelivery = true;
break;
}
}
try {
wait(toWait);
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
if (m != null || closed) {
break;
}
long now = System.currentTimeMillis();
toWait -= now - start;
start = now;
}
}
if (failedOver) {
if (m == null) {
if (logger.isTraceEnabled()) {
logger.trace("{}::receive({}, {}) -> m == null and failover", this, timeout, forcingDelivery);
}
// if failed over and the buffer is null, we reset the state and try it again
failedOver = false;
deliveryForced = false;
toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
continue;
} else {
if (logger.isTraceEnabled()) {
logger.trace("{}::receive({}, {}) -> failedOver, but m != null, being {}", this, timeout, forcingDelivery, m);
}
failedOver = false;
}
}
if (callForceDelivery) {
logger.trace("{}::Forcing delivery", this);
// Calling forceDelivery outside of the lock to avoid distributed dead locks
sessionContext.forceDelivery(this, forceDeliveryCount.getAndIncrement());
callForceDelivery = false;
deliveryForced = true;
continue;
}
if (m != null) {
session.workDone();
if (m.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);
// Need to check if forceDelivery was called at this call
// As we could be receiving a message that came from a previous call
if (forcingDelivery && deliveryForced && seq == forceDeliveryCount.get() - 1) {
// forced delivery messages are discarded, nothing has been delivered by the queue
resetIfSlowConsumer();
logger.trace("{}::There was nothing on the queue, leaving it now:: returning null", this);
return null;
} else {
logger.trace("{}::Ignored force delivery answer as it belonged to another call", this);
// Ignore the message
continue;
}
}
// if we have already pre acked we can't expire
boolean expired = m.isExpired();
flowControlBeforeConsumption(m);
if (expired) {
m.discardBody();
session.expire(this, m);
if (clientWindowSize == 0) {
startSlowConsumer();
}
if (toWait > 0) {
continue;
} else {
return null;
}
}
if (m.isLargeMessage()) {
largeMessageReceived = m;
}
logger.trace("{}::Returning {}", this, m);
return m;
} else {
logger.trace("{}::Returning null", this);
resetIfSlowConsumer();
return null;
}
}
} finally {
receiverThread = null;
}
}