in activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java [1415:1499]
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
clearMessagesInProgress();
clearDeliveredList();
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {
// deliverySequenceId non zero means previously queued dispatch
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) {
poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
return;
}
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired();
if (!expired) {
listener.onMessage(message);
}
afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) {
LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e);
md.setRollbackCause(e);
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
// schedual redelivery and possible dlq processing
rollback();
} else {
// Transacted or Client ack: Deliver the next message.
afterMessageIsConsumed(md, false);
}
}
} else {
md.setDeliverySequenceId(-1); // skip duplicate check on subsequent queued delivery
if (md.getMessage() == null) {
// End of browse or pull request timeout.
unconsumedMessages.enqueue(md);
} else {
if (!consumeExpiredMessage(md)) {
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
} else {
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, true);
// Pull consumer needs to check if pull timed out and send
// a new pull command if not.
if (info.getCurrentPrefetchSize() == 0) {
unconsumedMessages.enqueue(null);
}
}
}
}
} else {
// deal with duplicate delivery
ConsumerId consumerWithPendingTransaction;
if (redeliveryExpectedInCurrentTransaction(md, true)) {
LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
}
} else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
session.getConnection().rollbackDuplicate(this, md.getMessage());
dispatch(md);
} else {
LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
poisonAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
}
}
}
}
if (++dispatchedCount % 1000 == 0) {
dispatchedCount = 0;
Thread.yield();
}
} catch (Exception e) {
session.connection.onClientInternalException(e);
}
}