in src/DotPulsar/Internal/AsyncQueueWithCursor.cs [145:200]
public async ValueTask<T> NextItem(CancellationToken cancellationToken)
{
await _cursorSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
CancellationTokenRegistration? registration = null;
try
{
lock (_queue)
{
ThrowIfDisposed();
_currentNode = _currentNode is null || _currentNode.List is null ? _queue.First : _currentNode.Next;
if (_currentNode is not null)
return _currentNode.Value;
var tcs = new TaskCompletionSource<LinkedListNode<T>>(TaskCreationOptions.RunContinuationsAsynchronously);
_cursorNextItemTcs = tcs;
registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
}
var result = await _cursorNextItemTcs.Task.ConfigureAwait(false);
lock (_queue)
{
ThrowIfDisposed();
_currentNode = result;
}
return _currentNode.Value;
}
finally
{
var shouldThrow = _cursorNextItemTcs is not null && _cursorNextItemTcs.Task.IsCanceled;
lock (_queue)
{
registration?.Dispose();
_cursorNextItemTcs = null;
}
try
{
_cursorSemaphore.Release();
}
catch
{
// Ignore
}
if (shouldThrow)
{
throw new TaskCanceledException("The task was cancelled");
}
}
}