in src/MessageConsumer.cs [1182:1271]
public virtual async Task AfterMessageIsConsumedAsync(MessageDispatch dispatch, bool expired)
{
if(this.unconsumedMessages.Closed)
{
return;
}
if(expired)
{
await AcknowledgeAsync(dispatch, AckType.ExpiredAck).Await();
}
else
{
if(this.session.IsTransacted)
{
// Do nothing.
}
else if(this.IsAutoAcknowledgeEach)
{
if(this.deliveringAcks.CompareAndSet(false, true))
{
using(await this.deliveredMessagesLock.LockAsync().Await())
{
if(this.deliveredMessages.Count != 0)
{
if (this.optimizeAcknowledge)
{
this.ackCounter++;
if (IsOptimizedAckTime())
{
MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
if (ack != null)
{
this.deliveredMessages.Clear();
this.ackCounter = 0;
await this.session.SendAckAsync(ack).Await();
this.optimizeAckTimestamp = DateTime.Now;
}
// as further optimization send ack for expired msgs wehn
// there are any. This resets the deliveredCounter to 0 so
// that we won't sent standard acks with every msg just
// because the deliveredCounter just below 0.5 * prefetch
// as used in ackLater()
if (this.pendingAck != null && this.deliveredCounter > 0)
{
await this.session.SendAckAsync(pendingAck).Await();
this.pendingAck = null;
this.deliveredCounter = 0;
}
}
}
else
{
MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
if (ack != null)
{
this.deliveredMessages.Clear();
await this.session.SendAckAsync(ack).Await();
}
}
}
}
this.deliveringAcks.Value = false;
}
}
else if(this.IsAutoAcknowledgeBatch)
{
await AckLaterAsync(dispatch, AckType.ConsumedAck).Await();
}
else if(IsClientAcknowledge || IsIndividualAcknowledge)
{
bool messageAckedByConsumer = false;
using(await this.deliveredMessagesLock.LockAsync().Await())
{
messageAckedByConsumer = this.deliveredMessages.Contains(dispatch);
}
if(messageAckedByConsumer)
{
await AckLaterAsync(dispatch, AckType.DeliveredAck).Await();
}
}
else
{
throw new NMSException("Invalid session state.");
}
}
}