in src/MessageConsumer.cs [505:579]
internal async Task ShutdownAsync()
{
if(!this.unconsumedMessages.Closed)
{
if(Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Shutdown of Consumer[{0}] started.", ConsumerId);
}
// Do we have any acks we need to send out before closing?
// Ack any delivered messages now.
if(!this.session.IsTransacted)
{
DeliverAcks();
if(this.IsAutoAcknowledgeBatch)
{
await AcknowledgeAsync().Await();
}
}
if (this.executor != null)
{
this.executor.Shutdown();
this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
this.executor = null;
}
if (this.optimizedAckTask != null)
{
this.session.Scheduler.Cancel(this.optimizedAckTask);
}
if (this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
{
if (!this.info.Browser)
{
// rollback duplicates that aren't acknowledged
LinkedList<MessageDispatch> temp = null;
using(await this.deliveredMessagesLock.LockAsync().Await())
{
temp = new LinkedList<MessageDispatch>(this.deliveredMessages);
}
foreach (MessageDispatch old in temp)
{
this.session.Connection.RollbackDuplicate(this, old.Message);
}
temp.Clear();
}
}
if(!this.session.IsTransacted)
{
using(await this.deliveredMessagesLock.LockAsync().Await())
{
deliveredMessages.Clear();
}
}
this.session.RemoveConsumer(this);
this.unconsumedMessages.Close();
MessageDispatch[] unconsumed = unconsumedMessages.RemoveAll();
if (!this.info.Browser)
{
foreach (MessageDispatch old in unconsumed)
{
// ensure we don't filter this as a duplicate
session.Connection.RollbackDuplicate(this, old.Message);
}
}
if(Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId);
}
}
}