in src/MessageConsumer.cs [1018:1122]
private async Task<MessageDispatch> DequeueAsync(TimeSpan timeout)
{
DateTime deadline = DateTime.Now;
if(timeout > TimeSpan.Zero)
{
deadline += timeout;
}
while(true)
{
MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);
// Grab a single date/time for calculations to avoid timing errors.
DateTime dispatchTime = DateTime.Now;
if(dispatch == null)
{
if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
{
if(dispatchTime > deadline)
{
// Out of time.
timeout = TimeSpan.Zero;
}
else
{
// Adjust the timeout to the remaining time.
timeout = deadline - dispatchTime;
}
}
else
{
// Informs the caller of an error in the event that an async exception
// took down the parent connection.
if(this.failureError != null)
{
throw NMSExceptionSupport.Create(this.failureError);
}
return null;
}
}
else if(dispatch.Message == null)
{
return null;
}
else if(ConsumeExpiredMessage(dispatch))
{
Tracer.DebugFormat("Consumer[{0}] received expired message: {1}",
ConsumerId, dispatch.Message.MessageId);
await BeforeMessageIsConsumedAsync(dispatch).Await();
await AfterMessageIsConsumedAsync(dispatch, true).Await();
// Refresh the dispatch time
dispatchTime = DateTime.Now;
if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
{
if(dispatchTime > deadline)
{
// Out of time.
timeout = TimeSpan.Zero;
}
else
{
// Adjust the timeout to the remaining time.
timeout = deadline - dispatchTime;
}
}
SendPullRequest((long) timeout.TotalMilliseconds);
}
else if (RedeliveryExceeded(dispatch))
{
Tracer.DebugFormat("Consumer[{0}] received with excessive redelivered: {1}",
ConsumerId, dispatch);
await PosionAckAsync(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery " +
"policy limit:" + redeliveryPolicy.MaximumRedeliveries).Await();
// Refresh the dispatch time
dispatchTime = DateTime.Now;
if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
{
if(dispatchTime > deadline)
{
// Out of time.
timeout = TimeSpan.Zero;
}
else
{
// Adjust the timeout to the remaining time.
timeout = deadline - dispatchTime;
}
}
SendPullRequest((long) timeout.TotalMilliseconds);
}
else
{
return dispatch;
}
}
}