in src/Ingestor.cs [566:586]
private void RunDirectIngestInBatches(ICslAdminProvider kustoClient, IEnumerable<DataSourcesBatch> batches, bool ingestLocally)
{
ActionBlock<DataSourcesBatch> ingestBatchesBlock = new ActionBlock<DataSourcesBatch>(
batch => IngestBatch(batch, kustoClient, ingestLocally, m_ingestionProperties, m_ingestCompletionTimeout, m_ingestWithManagedIdentity),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = m_directIngestParallelRequests });
// Debugging: Allow the debugger to retrieve the DataFlow blocks:
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(ingestBatchesBlock), ingestBatchesBlock);
m_logger.LogVerbose("==> Flow RunDirectIngestInBatches starting...");
batches.ForEach(ds => ingestBatchesBlock.Post(ds));
ingestBatchesBlock.Complete();
bool bPipelineCompleted = false;
do
{
bPipelineCompleted = ingestBatchesBlock.Completion.Wait(TimeSpan.FromSeconds(10));
m_logger.LogInfo($"==> {DirectIngestStats()}");
} while (!bPipelineCompleted);
m_logger.LogVerbose("==> Flow RunDirectIngestInBatches done.");
}