in src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs [70:131]
public Task<IStreamDelivery> ReceiveAsync(TimeSpan timeout)
{
CheckClosedOrFailed();
TaskCompletionSource<IStreamDelivery> receive = new();
session.Execute(() =>
{
if (NotClosedOrFailed(receive))
{
IIncomingDelivery delivery = null;
// Scan all unsettled deliveries in the link and check if any are still
// lacking a linked stream delivery instance which indicates they have
// not been made part of a receive yet.
foreach (IIncomingDelivery candidate in protonLink.Unsettled)
{
if (candidate.LinkedResource == null)
{
delivery = candidate;
break;
}
}
if (delivery == null)
{
if (timeout == TimeSpan.Zero)
{
receive.TrySetResult(null);
}
else
{
if (timeout != TimeSpan.MaxValue)
{
if (timeout.TotalMilliseconds > uint.MaxValue)
{
receive.TrySetException(new ArgumentOutOfRangeException(
"Receive timeout must convert to a value less than UInt32.MaxValue Milliseconds"));
}
session.Schedule(() =>
{
if (!receive.Task.IsCompleted)
{
receiveRequests.Remove(receive);
receive.TrySetResult(null);
}
}, timeout);
}
receiveRequests.Enqueue(receive);
}
}
else
{
receive.TrySetResult(new ClientStreamDelivery(this, delivery));
AsyncReplenishCreditIfNeeded();
}
}
});
return receive.Task;
}