in Amazon.KinesisTap.Core/Components/AsyncBatchQueue.cs [134:184]
private void GetNextBatchFromSecondaryQueue(List<T> output)
{
var counts = new long[_limits.Length];
while (_outstandingQ.TryDequeue(out var item))
{
for (var i = 0; i < counts.Length; i++)
{
counts[i] += _counters[i].Invoke(item);
if (counts[i] > _limits[i])
{
_outstandingQ.Enqueue(item);
return;
}
}
output.Add(item);
}
var stop = false;
while (!stop)
{
if (!_secondaryQueue.TryDequeue(out var listItem))
{
return;
}
while (listItem.Count > 0 && !stop)
{
var item = listItem[0];
for (var i = 0; i < counts.Length; i++)
{
counts[i] += _counters[i].Invoke(item);
if (counts[i] > _limits[i])
{
stop = true;
break;
}
}
if (!stop)
{
output.Add(item);
listItem.RemoveAt(0);
}
if (stop && listItem.Count > 0)
{
_secondaryQueue.TryEnqueue(listItem);
}
}
}
}