in src/main/csharp/MessageConsumer.cs [810:874]
internal void Rollback()
{
lock(this.unconsumedMessages.SyncRoot)
{
lock(this.dispatchedMessages)
{
Tracer.DebugFormat("Rollback started, rolling back {0} message",
dispatchedMessages.Count);
if(this.dispatchedMessages.Count == 0)
{
return;
}
// Only increase the redelivery delay after the first redelivery..
MessageDispatch lastMd = this.dispatchedMessages.First.Value;
int currentRedeliveryCount = lastMd.Message.RedeliveryCounter;
redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
foreach(MessageDispatch dispatch in this.dispatchedMessages)
{
// Allow the message to update its internal to reflect a Rollback.
dispatch.Message.OnMessageRollback();
}
if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
{
this.redeliveryDelay = 0;
}
else
{
// stop the delivery of messages.
this.unconsumedMessages.Stop();
foreach(MessageDispatch dispatch in this.dispatchedMessages)
{
this.unconsumedMessages.EnqueueFirst(dispatch);
}
if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
{
Tracer.DebugFormat("Rollback delayed for {0} seconds", redeliveryDelay);
DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
}
else
{
Start();
}
}
this.deliveredCounter -= this.dispatchedMessages.Count;
this.dispatchedMessages.Clear();
}
}
// Only redispatch if there's an async listener otherwise a synchronous
// consumer will pull them from the local queue.
if(this.listener != null)
{
this.session.Redispatch(this.unconsumedMessages);
}
}