in src/Elastic.Channels/BufferedChannelBase.cs [376:432]
private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan maxInterval)
{
_callbacks.InboundChannelStartedCallback?.Invoke();
WaitToReadResult result;
while ((result = await InboundBuffer.WaitToReadAsync(InChannel.Reader).ConfigureAwait(false)) != WaitToReadResult.Completed)
{
if (TokenSource.Token.IsCancellationRequested) break;
if (_signal is { IsSet: true }) break;
if (result == WaitToReadResult.Timeout)
{
var timeouts = Interlocked.Increment(ref _seenTimeouts);
if (timeouts % 10 == 0)
{
// this should free up tasks blocking on WaitToReadAsync() within InBoundBuffer.WaitToReadAsync()
// read the comments in InBoundBuffer.WaitToReadAsync() to understand more.
// https://github.com/dotnet/runtime/issues/761
InChannel.Writer.TryWrite(null);
Interlocked.Exchange(ref _inflightEvents, 0);
continue;
}
}
while (InboundBuffer.Count < maxQueuedMessages && InChannel.Reader.TryRead(out var item))
{
if (item is null)
continue;
InboundBuffer.Add(item);
Interlocked.Decrement(ref _inflightEvents);
if (InboundBuffer.DurationSinceFirstWaitToRead >= maxInterval)
break;
}
if (InboundBuffer.ThresholdsHit)
await FlushBufferAsync().ConfigureAwait(false);
}
// It's possible to break out of the above while loop before a threshold was met to flush the buffer.
// This ensures we flush if there are any items left in the inbound buffer.
if (InboundBuffer.Count > 0)
await FlushBufferAsync().ConfigureAwait(false);
OutChannel.Writer.TryComplete();
async Task FlushBufferAsync()
{
var outboundBuffer = new OutboundBuffer<TEvent>(InboundBuffer);
if (await PublishAsync(outboundBuffer).ConfigureAwait(false))
_callbacks.PublishToOutboundChannelCallback?.Invoke();
else
_callbacks.PublishToOutboundChannelFailureCallback?.Invoke();
}
}