in Amazon.KinesisTap.Core/Components/AsyncBatchQueue.cs [186:243]
private async ValueTask GetNextBatchFromBuffer(List<T> output, int timeoutMs, CancellationToken cancellationToken)
{
var counts = new long[_limits.Length];
while (_outstandingQ.TryDequeue(out var item))
{
for (var i = 0; i < counts.Length; i++)
{
counts[i] += _counters[i].Invoke(item);
if (counts[i] > _limits[i])
{
_outstandingQ.Enqueue(item);
return;
}
}
output.Add(item);
}
while (_channel.Reader.TryRead(out var item))
{
for (var i = 0; i < counts.Length; i++)
{
counts[i] += _counters[i].Invoke(item);
if (counts[i] > _limits[i])
{
_outstandingQ.Enqueue(item);
return;
}
}
output.Add(item);
}
if (timeoutMs == 0)
{
return;
}
// if we've reached this point, it means the batch is not full but the channel is empty
// instead of using ReadAsync(), we simply wait for the timeout period
await Task.Delay(timeoutMs, cancellationToken);
// attempt to fill the batch again one more time
while (_channel.Reader.TryRead(out var item))
{
for (var i = 0; i < counts.Length; i++)
{
counts[i] += _counters[i].Invoke(item);
if (counts[i] > _limits[i])
{
_outstandingQ.Enqueue(item);
return;
}
}
output.Add(item);
}
}