private async ValueTask GetNextBatchFromBuffer()

in Amazon.KinesisTap.Core/Components/AsyncBatchQueue.cs [186:243]


        private async ValueTask GetNextBatchFromBuffer(List<T> output, int timeoutMs, CancellationToken cancellationToken)
        {
            var counts = new long[_limits.Length];

            while (_outstandingQ.TryDequeue(out var item))
            {
                for (var i = 0; i < counts.Length; i++)
                {
                    counts[i] += _counters[i].Invoke(item);
                    if (counts[i] > _limits[i])
                    {
                        _outstandingQ.Enqueue(item);
                        return;
                    }
                }
                output.Add(item);
            }

            while (_channel.Reader.TryRead(out var item))
            {
                for (var i = 0; i < counts.Length; i++)
                {
                    counts[i] += _counters[i].Invoke(item);
                    if (counts[i] > _limits[i])
                    {
                        _outstandingQ.Enqueue(item);
                        return;
                    }
                }

                output.Add(item);
            }

            if (timeoutMs == 0)
            {
                return;
            }

            // if we've reached this point, it means the batch is not full but the channel is empty
            // instead of using ReadAsync(), we simply wait for the timeout period 
            await Task.Delay(timeoutMs, cancellationToken);

            // attempt to fill the batch again one more time
            while (_channel.Reader.TryRead(out var item))
            {
                for (var i = 0; i < counts.Length; i++)
                {
                    counts[i] += _counters[i].Invoke(item);
                    if (counts[i] > _limits[i])
                    {
                        _outstandingQ.Enqueue(item);
                        return;
                    }
                }

                output.Add(item);
            }
        }