in src/main/csharp/MessageConsumer.cs [723:766]
private void AckLater(MessageDispatch dispatch)
{
// Don't acknowledge now, but we may need to let the broker know the
// consumer got the message to expand the pre-fetch window
if(this.session.IsTransacted)
{
this.session.DoStartTransaction();
if(!synchronizationRegistered)
{
this.synchronizationRegistered = true;
this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
}
}
this.deliveredCounter++;
MessageAck oldPendingAck = pendingAck;
pendingAck = new MessageAck();
pendingAck.AckType = (byte) AckType.ConsumedAck;
pendingAck.ConsumerId = this.info.ConsumerId;
pendingAck.Destination = dispatch.Destination;
pendingAck.LastMessageId = dispatch.Message.MessageId;
pendingAck.MessageCount = deliveredCounter;
if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
{
pendingAck.TransactionId = this.session.TransactionContext.TransactionId;
}
if(oldPendingAck == null)
{
pendingAck.FirstMessageId = pendingAck.LastMessageId;
}
if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize))
{
this.session.SendAck(pendingAck);
this.pendingAck = null;
this.deliveredCounter = 0;
this.additionalWindowSize = 0;
}
}