public async ValueTask NextItem()

in src/DotPulsar/Internal/AsyncQueueWithCursor.cs [145:200]


    public async ValueTask<T> NextItem(CancellationToken cancellationToken)
    {
        await _cursorSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

        CancellationTokenRegistration? registration = null;
        try
        {
            lock (_queue)
            {
                ThrowIfDisposed();

                _currentNode = _currentNode is null || _currentNode.List is null ? _queue.First : _currentNode.Next;

                if (_currentNode is not null)
                    return _currentNode.Value;

                var tcs = new TaskCompletionSource<LinkedListNode<T>>(TaskCreationOptions.RunContinuationsAsynchronously);
                _cursorNextItemTcs = tcs;
                registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
            }

            var result = await _cursorNextItemTcs.Task.ConfigureAwait(false);

            lock (_queue)
            {
                ThrowIfDisposed();
                _currentNode = result;
            }

            return _currentNode.Value;
        }
        finally
        {
            var shouldThrow = _cursorNextItemTcs is not null && _cursorNextItemTcs.Task.IsCanceled;

            lock (_queue)
            {
                registration?.Dispose();
                _cursorNextItemTcs = null;
            }

            try
            {
                _cursorSemaphore.Release();
            }
            catch
            {
                // Ignore
            }

            if (shouldThrow)
            {
                throw new TaskCanceledException("The task was cancelled");
            }
        }
    }