private void RunDirectIngestInBatches()

in src/Ingestor.cs [566:586]


        private void RunDirectIngestInBatches(ICslAdminProvider kustoClient, IEnumerable<DataSourcesBatch> batches, bool ingestLocally)
        {
            ActionBlock<DataSourcesBatch> ingestBatchesBlock = new ActionBlock<DataSourcesBatch>(
                batch => IngestBatch(batch, kustoClient, ingestLocally, m_ingestionProperties, m_ingestCompletionTimeout, m_ingestWithManagedIdentity),
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = m_directIngestParallelRequests });

            // Debugging: Allow the debugger to retrieve the DataFlow blocks:
            Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(ingestBatchesBlock), ingestBatchesBlock);

            m_logger.LogVerbose("==> Flow RunDirectIngestInBatches starting...");
            batches.ForEach(ds => ingestBatchesBlock.Post(ds));
            ingestBatchesBlock.Complete();

            bool bPipelineCompleted = false;
            do
            {
                bPipelineCompleted = ingestBatchesBlock.Completion.Wait(TimeSpan.FromSeconds(10));
                m_logger.LogInfo($"==> {DirectIngestStats()}");
            } while (!bPipelineCompleted);
            m_logger.LogVerbose("==> Flow RunDirectIngestInBatches done.");
        }