in src/MessageConsumer.cs [821:947]
public virtual async Task Dispatch_Async(MessageDispatch dispatch)
{
MessageListener listener = this.listener;
bool dispatchMessage = false;
try
{
ClearMessagesInProgress();
ClearDeliveredList();
using(await this.unconsumedMessages.SyncRoot.LockAsync().Await())
{
if(!this.unconsumedMessages.Closed)
{
if(this.info.Browser || !session.Connection.IsDuplicate(this, dispatch.Message))
{
if(listener != null && this.unconsumedMessages.Running)
{
if (RedeliveryExceeded(dispatch))
{
await PosionAckAsync(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries).Await();
return;
}
else
{
dispatchMessage = true;
}
}
else
{
if (!this.unconsumedMessages.Running)
{
// delayed redelivery, ensure it can be re delivered
session.Connection.RollbackDuplicate(this, dispatch.Message);
}
this.unconsumedMessages.Enqueue(dispatch);
// TODO - Signal message available when we have that event hook.
}
}
else
{
// deal with duplicate delivery
ConsumerId consumerWithPendingTransaction;
if (RedeliveryExpectedInCurrentTransaction(dispatch, true))
{
Tracer.DebugFormat("Consumer[{0}] tracking transacted({1}) redelivery [{2}]",
ConsumerId, previouslyDeliveredMessages.TransactionId, dispatch.Message);
if (TransactedIndividualAck)
{
await ImmediateIndividualTransactedAckAsync(dispatch).Await();
}
else
{
await this.session.SendAckAsync(new MessageAck(dispatch, (byte) AckType.DeliveredAck, 1)).Await();
}
}
else if ((consumerWithPendingTransaction = RedeliveryPendingInCompetingTransaction(dispatch)) != null)
{
Tracer.WarnFormat("Consumer[{0}] delivering duplicate [{1}], pending transaction completion on ({1}) will rollback",
ConsumerId, dispatch.Message, consumerWithPendingTransaction);
this.session.Connection.RollbackDuplicate(this, dispatch.Message);
await Dispatch_Async(dispatch).Await();
}
else
{
Tracer.WarnFormat("Consumer[{0}] suppressing duplicate delivery on connection, poison acking: ({1})",
ConsumerId, dispatch);
await PosionAckAsync(dispatch, "Suppressing duplicate delivery on connection, consumer " + ConsumerId).Await();
}
}
}
}
if(dispatchMessage)
{
ActiveMQMessage message = CreateActiveMQMessage(dispatch);
await BeforeMessageIsConsumedAsync(dispatch).Await();
try
{
bool expired = (!IgnoreExpiration && message.IsExpired());
if(!expired)
{
listener(message);
}
await this.AfterMessageIsConsumedAsync(dispatch, expired).Await();
}
catch(Exception e)
{
dispatch.RollbackCause = e;
if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
{
// Schedule redelivery and possible dlq processing
await RollbackAsync().Await();
}
else
{
// Transacted or Client ack: Deliver the next message.
await this.AfterMessageIsConsumedAsync(dispatch, false).Await();
}
Tracer.ErrorFormat("Consumer[{0}] Exception while processing message: {1}", this.info.ConsumerId, e);
// If aborted we stop the abort here and let normal processing resume.
// This allows the session to shutdown normally and ack all messages
// that have outstanding acks in this consumer.
if((Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
{
Thread.ResetAbort();
}
}
}
if(++dispatchedCount % 1000 == 0)
{
dispatchedCount = 0;
Thread.Sleep(1);
}
}
catch(Exception e)
{
this.session.Connection.OnSessionException(this.session, e);
}
}