in src/ReceivingAmqpLink.cs [694:737]
void CancelPendingOperations(bool aborted, out Queue<AmqpMessage> messagesToRelease)
{
messagesToRelease = null;
List<ReceiveAsyncResult> waiters = null;
if (!this.messageQueue.IsEmpty)
{
messagesToRelease = new Queue<AmqpMessage>();
while (this.messageQueue.TryDequeue(out AmqpMessage message))
{
messagesToRelease.Enqueue(message);
}
}
if (this.waiterManager.Count > 0)
{
waiters = this.waiterManager.RemoveAll();
}
if (waiters != null)
{
ActionItem.Schedule(static o =>
{
var state = (Tuple<List<ReceiveAsyncResult>, bool, Exception>)o;
List<ReceiveAsyncResult> waitersToCancel = state.Item1;
foreach (ReceiveAsyncResult waiter in waitersToCancel)
{
if (state.Item2)
{
waiter.Signal(false, new OperationCanceledException("Link aborted", state.Item3));
}
else
{
waiter.Signal(false, null);
}
}
},
new Tuple<List<ReceiveAsyncResult>, bool, Exception>(waiters, aborted, this.TerminalException));
}
if (this.pendingDispositions != null)
{
this.pendingDispositions.Abort();
}
}