in src/Elastic.Channels/Buffers/InboundBuffer.cs [82:123]
public async Task<WaitToReadResult> WaitToReadAsync(ChannelReader<TEvent?> reader)
{
TimeOfFirstWaitToRead ??= DateTimeOffset.UtcNow;
if (_breaker.IsCancellationRequested)
{
_breaker.Dispose();
_breaker = new CancellationTokenSource();
}
try
{
// https://github.com/dotnet/runtime/issues/761
// cancellation tokens may not be unrooted properly by design if cancellation occurs.
// by checking explicitly which task ends up being completed we can uncover when
// We accept the possibility of several pending tasks blocking on WaitToReadAsync()
// These will all unblock and free up when a new message gets pushed.
// To aid with cleaning these tasks up we write `default` to the channel when this task returns TimeOut
_breaker.CancelAfter(Wait);
var readTask = reader.WaitToReadAsync().AsTask();
var delayTask = Task.Delay(Timeout.Infinite, _breaker.Token);
var completedTask = await Task.WhenAny(readTask, delayTask).ConfigureAwait(false);
if (completedTask == delayTask)
throw new OperationCanceledException(_breaker.Token);
_breaker.CancelAfter(-1);
return await readTask.ConfigureAwait(false) ? WaitToReadResult.Read : WaitToReadResult.Completed;
}
catch (Exception) when (_breaker.IsCancellationRequested)
{
_breaker.Dispose();
_breaker = new CancellationTokenSource();
return WaitToReadResult.Timeout;
}
catch (Exception)
{
_breaker.CancelAfter(Wait);
return WaitToReadResult.Read;
}
}