internal void RunQueuedIngest()

in src/Ingestor.cs [302:372]


        internal void RunQueuedIngest(KustoConnectionStringBuilder kcsb)
        {
            Reset();

            var stopwatch = ExtendedStopwatch.StartNew();
            using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(kcsb))
            {
                ActionBlock<string> listObjectsBlock = null;
                ActionBlock<IPersistentStorageFile> filterObjectsBlock = null;
                ActionBlock<DataSource> ingestBlock = null;

                if (m_bFileSystem) // Data is in local files
                {
                    ingestBlock = new ActionBlock<DataSource>(
                        record => IngestSingle(record, m_objectsCountQuota, ingestClient, m_bFileSystem, false, m_ingestionProperties, m_ingestWithManagedIdentity),
                        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });

                    listObjectsBlock = new ActionBlock<string>(
                        sourcePath => ListAndFilterFiles(sourcePath, m_args.Pattern, m_objectsCountQuota, ingestBlock),
                        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
                    listObjectsBlock.Completion.ContinueWith(delegate { ingestBlock.Complete(); });
                }
                else // Input is in blobs
                {
                    ingestBlock = new ActionBlock<DataSource>(
                        record => IngestSingle(record, m_objectsCountQuota, ingestClient, m_bFileSystem, false, m_ingestionProperties, m_ingestWithManagedIdentity),
                        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });

                    // ListFiles calls PSL EnumerateFiles which accepts a pattern but BlobPersistentStorageFactory2 doesn't use the full pattern
                    // but only its prefix, therefore we still have to filter ourselves.
                    filterObjectsBlock = new ActionBlock<IPersistentStorageFile>(
                        file => FilterFilesAsync(file, m_patternRegex, m_objectsCountQuota, ingestBlock),
                        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });
                    filterObjectsBlock.Completion.ContinueWith(delegate { ingestBlock.Complete(); });

                    listObjectsBlock = new ActionBlock<string>(
                        sourcePath => ListFiles(sourcePath, m_args.Prefix, m_objectsCountQuota, filterObjectsBlock),
                        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
                    listObjectsBlock.Completion.ContinueWith(delegate { filterObjectsBlock.Complete(); });
                }
                // Debugging: Allow the debugger to retrieve the DataFlow blocks:
                Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(ingestBlock), ingestBlock);
                Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(filterObjectsBlock), filterObjectsBlock);
                Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(listObjectsBlock), listObjectsBlock);

                m_logger.LogVerbose("==> Starting...");
                listObjectsBlock.Post(m_args.SourcePath);
                listObjectsBlock.Complete();

                bool bPipelineCompleted = false;
                do
                {
                    bPipelineCompleted = ingestBlock.Completion.Wait(TimeSpan.FromSeconds(10));

                    m_logger.LogInfo($"==> {QueuedIngestStats()}");
                } while (!bPipelineCompleted);

                stopwatch.Stop();
            }

            m_logger.LogSuccess($"    Done. Time elapsed: {stopwatch.Elapsed:c}");
            m_logger.LogSuccess($"    {QueuedIngestStats()}");

            // Wait for ingestion completion, if required
            if (m_bWaitForIngestCompletion && m_ingestionResults.SafeFastAny())
            {
                m_logger.LogInfo("==> Waiting for ingestion completion...");
                WaitForIngestionResult(m_ingestionResults, m_ingestCompletionTimeout);
                m_logger.LogInfo("==> Done.");
            }
        }