private async Task ConsumeInboundEventsAsync()

in src/Elastic.Channels/BufferedChannelBase.cs [376:432]


	private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan maxInterval)
	{
		_callbacks.InboundChannelStartedCallback?.Invoke();

		WaitToReadResult result;
		while ((result = await InboundBuffer.WaitToReadAsync(InChannel.Reader).ConfigureAwait(false)) != WaitToReadResult.Completed)
		{
			if (TokenSource.Token.IsCancellationRequested) break;
			if (_signal is { IsSet: true }) break;

			if (result == WaitToReadResult.Timeout)
			{
				var timeouts = Interlocked.Increment(ref _seenTimeouts);
				if (timeouts % 10 == 0)
				{
					// this should free up tasks blocking on WaitToReadAsync() within InBoundBuffer.WaitToReadAsync()
					// read the comments in InBoundBuffer.WaitToReadAsync() to understand more.
					// https://github.com/dotnet/runtime/issues/761
					InChannel.Writer.TryWrite(null);
					Interlocked.Exchange(ref _inflightEvents, 0);
					continue;
				}
			}

			while (InboundBuffer.Count < maxQueuedMessages && InChannel.Reader.TryRead(out var item))
			{
				if (item is null)
					continue;

				InboundBuffer.Add(item);
				Interlocked.Decrement(ref _inflightEvents);

				if (InboundBuffer.DurationSinceFirstWaitToRead >= maxInterval)
					break;
			}

			if (InboundBuffer.ThresholdsHit)
				await FlushBufferAsync().ConfigureAwait(false);
		}

		// It's possible to break out of the above while loop before a threshold was met to flush the buffer.
		// This ensures we flush if there are any items left in the inbound buffer.
		if (InboundBuffer.Count > 0)
			await FlushBufferAsync().ConfigureAwait(false);

		OutChannel.Writer.TryComplete();

		async Task FlushBufferAsync()
		{
			var outboundBuffer = new OutboundBuffer<TEvent>(InboundBuffer);

			if (await PublishAsync(outboundBuffer).ConfigureAwait(false))
				_callbacks.PublishToOutboundChannelCallback?.Invoke();
			else
				_callbacks.PublishToOutboundChannelFailureCallback?.Invoke();
		}
	}