in code/KustoCopyConsole/Runner/QueueIngestRunner.cs [33:59]
public async Task RunAsync(CancellationToken ct)
{
// Clean half-queued URLs
CleanQueuingUrls();
while (!AllActivitiesCompleted())
{
var allBlocks = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy(
a => a.RowItem.State != ActivityState.Completed,
i => i.RowItem.State != IterationState.Completed);
var exportedBlocks = allBlocks
.Where(h => h.Block.State == BlockState.Exported);
var ingestionTasks = exportedBlocks
.OrderBy(h => h.Activity.ActivityName)
.ThenBy(h => h.Block.IterationId)
.ThenBy(h => h.Block.BlockId)
.Select(h => QueueIngestBlockAsync(h, ct))
.ToImmutableArray();
await TaskHelper.WhenAllWithErrors(ingestionTasks);
if (!ingestionTasks.Any())
{
// Sleep
await SleepAsync(ct);
}
}
}