in activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java [884:1105]
public void run() {
MessageDispatch messageDispatch;
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
// subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage
final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy();
if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
}
if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages());
((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages());
}
MessageAck earlyAck = null;
if (message.isExpired()) {
earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
earlyAck.setFirstMessageId(message.getMessageId());
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
earlyAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
earlyAck.setFirstMessageId(md.getMessage().getMessageId());
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
}
if (earlyAck != null) {
try {
asyncSendPacket(earlyAck);
} catch (Throwable t) {
LOG.error("error dispatching ack: {} ", earlyAck, t);
connection.onClientInternalException(t);
} finally {
continue;
}
}
if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() {
@Override
public void execute() throws Exception {
}
});
}
if (deliveryListener != null) {
deliveryListener.beforeDelivery(this, message);
}
md.setDeliverySequenceId(getNextDeliveryId());
lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
/*
* The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
* We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
* */
synchronized (redeliveryGuard) {
try {
ack.setFirstMessageId(md.getMessage().getMessageId());
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
if (ack.getTransactionId() != null) {
getTransactionContext().addSynchronization(new Synchronization() {
final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
@Override
public void beforeEnd() throws Exception {
// validate our consumer so we don't push stale acks that get ignored
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
}
LOG.trace("beforeEnd ack {}", ack);
sendAck(ack);
}
@Override
public void afterRollback() throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("afterRollback {}", ack, new Throwable("here"));
}
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
// don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
if (clearRequestsCounter.get() > clearRequestCount) {
LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
// validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get
// sent to the
// DLQ.
// Acknowledge the last message.
MessageAck ack = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
asyncSendPacket(ack);
} else {
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack);
// Figure out how long we should wait to resend
// this message.
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
/*
* If we are a non blocking delivery then we need to stop the executor to avoid more
* messages being delivered, once the message is redelivered we can restart it.
* */
if (!connection.isNonBlockingRedelivery()) {
LOG.debug("Blocking session until re-delivery...");
executor.stop();
}
connection.getScheduler().executeAfterDelay(new Runnable() {
@Override
public void run() {
/*
* wait for the first delivery to be complete, i.e. after delivery has been called.
* */
synchronized (redeliveryGuard) {
/*
* If its non blocking then we can just dispatch in a new session.
* */
if (connection.isNonBlockingRedelivery()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
/*
* If there has been an error thrown during afterDelivery then the
* endpoint will be marked as dead so redelivery will fail (and eventually
* the session marked as stale), in this case we can only call dispatch
* which will create a new session with a new endpoint.
* */
if (afterDeliveryError.get()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
executor.executeFirst(md);
executor.start();
}
}
}
}
}, redeliveryDelay);
}
md.getMessage().onMessageRolledBack();
}
});
}
LOG.trace("{} onMessage({})", this, message.getMessageId());
messageListener.onMessage(message);
} catch (Throwable e) {
if (!isClosed()) {
LOG.error("{} error dispatching message: {} ", this, message.getMessageId(), e);
}
if (getTransactionContext() != null && getTransactionContext().isInXATransaction()) {
LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
getTransactionContext().setRollbackOnly(true);
}
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
// commit or roll back in order to deal with the exception.
// However, we notify any registered client internal exception listener
// of the problem.
connection.onClientInternalException(e);
} finally {
if (ack.getTransactionId() == null) {
try {
asyncSendPacket(ack);
} catch (Throwable e) {
connection.onClientInternalException(e);
}
}
}
if (deliveryListener != null) {
try {
deliveryListener.afterDelivery(this, message);
} catch (Throwable t) {
LOG.debug("Unable to call after delivery", t);
afterDeliveryError.set(true);
throw new RuntimeException(t);
}
}
}
/*
* this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
* It also needs to be outside the redelivery guard.
* */
try {
executor.waitForQueueRestart();
} catch (InterruptedException ex) {
connection.onClientInternalException(ex);
}
}
}