in code/KustoCopyConsole/Storage/RowItemGateway.cs [185:213]
private async Task BackgroundPersistanceAsync(CancellationToken ct)
{
while (!_backgroundCompletedSource.Task.IsCompleted)
{
if (_rowItemQueue.TryPeek(out var queueItem))
{
var delta = DateTime.Now - queueItem.EnqueueTime;
var waitTime = FLUSH_PERIOD - delta;
if (waitTime < MIN_WAIT_PERIOD)
{
await PersistBatchAsync(ct);
}
else
{ // Wait for first item to age to about FLUSH_PERIOD
await Task.WhenAny(
Task.Delay(waitTime, ct),
_backgroundCompletedSource.Task);
}
}
else
{ // Wait for an element to pop in
await Task.WhenAny(
Task.Delay(FLUSH_PERIOD, ct),
_backgroundCompletedSource.Task);
}
await CleanReleaseSourceTaskQueueAsync();
}
}