in src/MessageConsumer.cs [1442:1592]
internal async Task RollbackAsync()
{
ClearDeliveredList();
using(await this.unconsumedMessages.SyncRoot.LockAsync().Await())
{
if (this.optimizeAcknowledge)
{
// remove messages read but not acked at the broker yet through optimizeAcknowledge
if (!this.info.Browser)
{
using(await this.deliveredMessagesLock.LockAsync().Await())
{
for (int i = 0; (i < this.deliveredMessages.Count) && (i < ackCounter); i++)
{
// ensure we don't filter this as a duplicate
MessageDispatch dispatch = this.deliveredMessages.Last.Value;
this.deliveredMessages.RemoveLast();
session.Connection.RollbackDuplicate(this, dispatch.Message);
}
}
}
}
using(await this.deliveredMessagesLock.LockAsync().Await())
{
RollbackPreviouslyDeliveredAndNotRedelivered();
if(this.deliveredMessages.Count == 0)
{
Tracer.DebugFormat("Consumer[{0}] Rolled Back with no dispatched Messages", ConsumerId);
return;
}
// Only increase the redelivery delay after the first redelivery..
MessageDispatch lastMd = this.deliveredMessages.First.Value;
int currentRedeliveryCount = lastMd.Message.RedeliveryCounter;
redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
MessageId firstMsgId = this.deliveredMessages.Last.Value.Message.MessageId;
foreach(MessageDispatch dispatch in this.deliveredMessages)
{
// Allow the message to update its internal to reflect a Rollback.
dispatch.Message.OnMessageRollback();
// ensure we don't filter this as a duplicate
session.Connection.RollbackDuplicate(this, dispatch.Message);
}
if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
{
// We need to NACK the messages so that they get sent to the DLQ.
MessageAck ack = new MessageAck(lastMd, (byte) AckType.PoisonAck, deliveredMessages.Count);
Tracer.DebugFormat("Consumer[{0}] Poison Ack of {1} messages aft max redeliveries: {2}",
ConsumerId, this.deliveredMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);
BrokerError poisonCause = new BrokerError();
poisonCause.ExceptionClass = "javax.jms.JMSException";
poisonCause.Message = "Exceeded RedeliveryPolicy limit: " + RedeliveryPolicy.MaximumRedeliveries;
if (lastMd.RollbackCause != null)
{
BrokerError cause = new BrokerError();
cause.ExceptionClass = "javax.jms.JMSException";
cause.Message = lastMd.RollbackCause.Message;
poisonCause.Cause = cause;
poisonCause.Message = poisonCause.Message + " cause: " + lastMd.RollbackCause.Message;
}
ack.FirstMessageId = firstMsgId;
ack.PoisonCause = poisonCause;
await this.session.SendAckAsync(ack).Await();
// Adjust the window size.
additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.deliveredMessages.Count);
this.redeliveryDelay = 0;
this.deliveredCounter -= this.deliveredMessages.Count;
this.deliveredMessages.Clear();
}
else
{
// We only send a RedeliveryAck after the first redelivery
if(currentRedeliveryCount > 0)
{
MessageAck ack = new MessageAck(lastMd, (byte) AckType.RedeliveredAck, deliveredMessages.Count);
ack.FirstMessageId = firstMsgId;
await this.session.SendAckAsync(ack, true).Await();
}
if (this.nonBlockingRedelivery)
{
if(redeliveryDelay == 0)
{
redeliveryDelay = RedeliveryPolicy.InitialRedeliveryDelay;
}
Tracer.DebugFormat("Consumer[{0}] Rolled Back, Re-enque {1} messages in Non-Blocking mode, delay: {2}",
ConsumerId, this.deliveredMessages.Count, redeliveryDelay);
List<MessageDispatch> pendingRedeliveries =
new List<MessageDispatch>(this.deliveredMessages);
pendingRedeliveries.Reverse();
this.deliveredCounter -= this.deliveredMessages.Count;
this.deliveredMessages.Clear();
this.session.Scheduler.ExecuteAfterDelay(
NonBlockingRedeliveryCallback,
pendingRedeliveries,
TimeSpan.FromMilliseconds(redeliveryDelay));
}
else
{
// stop the delivery of messages.
this.unconsumedMessages.Stop();
Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages",
ConsumerId, this.deliveredMessages.Count);
foreach(MessageDispatch dispatch in this.deliveredMessages)
{
this.unconsumedMessages.EnqueueFirst(dispatch);
}
this.deliveredCounter -= this.deliveredMessages.Count;
this.deliveredMessages.Clear();
if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
{
DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
}
else
{
Start();
}
}
}
}
}
// Only redispatch if there's an async listener otherwise a synchronous
// consumer will pull them from the local queue.
if(this.listener != null)
{
this.session.Redispatch(this, this.unconsumedMessages);
}
}