private void RunPrepareBatchesForDirectIngest()

in src/Ingestor.cs [506:564]


        private void RunPrepareBatchesForDirectIngest(ICslAdminProvider kustoClient, bool kustoRunningLocally)
        {
            ActionBlock<string> listObjectsBlock = null;
            ActionBlock<IPersistentStorageFile> filterObjectsBlock = null;
            ActionBlock<DataSource> uploadOrAccumulateBlock = null;

            IPersistentStorageContainer tempContainer = null;

            if (m_bFileSystem) // Data is in local files
            {
                // We only need to upload to a real blob container is if we are *not* working with a local Kusto service
                if (!kustoRunningLocally)
                {
                    tempContainer = AcquireTempBlobContainer(kustoClient);
                }

                uploadOrAccumulateBlock = new ActionBlock<DataSource>(
                    record => UploadFiles(record, tempContainer),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });

                listObjectsBlock = new ActionBlock<string>(
                    sourcePath => ListAndFilterFiles(sourcePath, m_args.Pattern, m_objectsCountQuota, uploadOrAccumulateBlock),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
                listObjectsBlock.Completion.ContinueWith(delegate { uploadOrAccumulateBlock.Complete(); });
            }
            else // Input is not local files
            {
                uploadOrAccumulateBlock = new ActionBlock<DataSource>(
                    record => AccumulateObjects(record),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });

                filterObjectsBlock = new ActionBlock<IPersistentStorageFile>(
                    file => FilterFilesAsync(file, m_patternRegex, m_objectsCountQuota, uploadOrAccumulateBlock),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });
                filterObjectsBlock.Completion.ContinueWith(delegate { uploadOrAccumulateBlock.Complete(); });

                listObjectsBlock = new ActionBlock<string>(
                    sourcePath => ListFiles(sourcePath, m_args.Prefix, m_objectsCountQuota, filterObjectsBlock),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
                listObjectsBlock.Completion.ContinueWith(delegate { filterObjectsBlock.Complete(); });
            }

            // Debugging: Allow the debugger to retrieve the DataFlow blocks:
            Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(listObjectsBlock), listObjectsBlock);
            Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(filterObjectsBlock), filterObjectsBlock);
            Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(uploadOrAccumulateBlock), uploadOrAccumulateBlock);

            m_logger.LogVerbose("==> Flow RunPrepareBatchesForDirectIngest starting...");
            listObjectsBlock.Post(m_args.SourcePath);
            listObjectsBlock.Complete();

            bool bPipelineCompleted = false;
            do
            {
                bPipelineCompleted = uploadOrAccumulateBlock.Completion.Wait(TimeSpan.FromSeconds(10));
                m_logger.LogInfo($"==> {DirectIngestStats()}");
            } while (!bPipelineCompleted);
            m_logger.LogVerbose("==> Flow RunPrepareBatchesForDirectIngest done.");
        }