in src/DotPulsar/Internal/AsyncQueueWithCursor.cs [244:270]
public async ValueTask DisposeAsync()
{
lock (_queue)
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
}
_cursorSemaphore.Dispose();
_cursorNextItemTcs?.TrySetCanceled(CancellationToken.None);
var disposeLock = _pendingLock.DisposeAsync();
ReleasePendingLockGrant();
await disposeLock.ConfigureAwait(false);
foreach (var tcs in _queueEmptyTcs)
{
tcs.TrySetCanceled(CancellationToken.None);
}
lock (_queue)
{
foreach (var item in _queue)
{
item.Dispose();
}
}
}