in src/main/csharp/MessageConsumer.cs [639:698]
public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
{
if(this.unconsumedMessages.Closed)
{
return;
}
if(expired == true)
{
lock(this.dispatchedMessages)
{
this.dispatchedMessages.Remove(dispatch);
}
// TODO - Not sure if we need to ack this in stomp.
// AckLater(dispatch, AckType.ConsumedAck);
}
else
{
if(this.session.IsTransacted)
{
// Do nothing.
}
else if(this.session.IsAutoAcknowledge)
{
if(this.deliveringAcks.CompareAndSet(false, true))
{
lock(this.dispatchedMessages)
{
// If a Recover was called in the async handler then
// we don't want to send an ack otherwise the broker will
// think we consumed the message.
if (this.dispatchedMessages.Count > 0)
{
MessageAck ack = new MessageAck();
ack.AckType = (byte) AckType.ConsumedAck;
ack.ConsumerId = this.info.ConsumerId;
ack.Destination = dispatch.Destination;
ack.LastMessageId = dispatch.Message.MessageId;
ack.MessageCount = 1;
this.session.SendAck(ack);
}
}
this.deliveringAcks.Value = false;
this.dispatchedMessages.Clear();
}
}
else if(this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
{
// Do nothing.
}
else
{
throw new NMSException("Invalid session state.");
}
}
}