in src/MessageConsumer.cs [766:819]
private void ClearDeliveredList()
{
if (this.clearDeliveredList)
{
using(this.deliveredMessagesLock.Lock())
{
if (!this.clearDeliveredList || deliveredMessages.Count == 0)
{
clearDeliveredList = false;
return;
}
if (session.IsTransacted)
{
Tracer.DebugFormat("Consumer[{0}]: tracking existing transacted delivered list {1} on transport interrupt",
this.info.ConsumerId, deliveredMessages.Count);
if (previouslyDeliveredMessages == null)
{
previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.TransactionContext.TransactionId);
}
foreach (MessageDispatch delivered in deliveredMessages)
{
this.previouslyDeliveredMessages[delivered.Message.MessageId] = false;
}
}
else
{
if (this.session.IsClientAcknowledge)
{
Tracer.DebugFormat("Consumer[{0}] rolling back delivered list " +
"({1}) on transport interrupt",
ConsumerId, deliveredMessages.Count);
// allow redelivery
if (!this.info.Browser)
{
foreach (MessageDispatch dispatch in deliveredMessages)
{
this.session.Connection.RollbackDuplicate(this, dispatch.Message);
}
}
}
Tracer.DebugFormat("Consumer[{0}]: clearing delivered list {1} on transport interrupt",
this.info.ConsumerId, deliveredMessages.Count);
this.deliveredMessages.Clear();
this.pendingAck = null;
}
this.clearDeliveredList = false;
}
}
}