internal void RunIngestSimulation()

in src/Ingestor.cs [450:503]


        internal void RunIngestSimulation()
        {
            Reset();

            var stopwatch = ExtendedStopwatch.StartNew();

            ActionBlock<string> listObjectsBlock = null;
            ActionBlock<IPersistentStorageFile> filterObjectsBlock = null;
            ActionBlock<DataSource> simulatedIngestBlock = null;

            if (m_bFileSystem) // Data is in local files
            {
                simulatedIngestBlock = new ActionBlock<DataSource>(
                    record => LogSingleObject(record),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });

                listObjectsBlock = new ActionBlock<string>(
                    sourcePath => ListAndFilterFiles(sourcePath, m_args.Pattern, m_objectsCountQuota, simulatedIngestBlock),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
                listObjectsBlock.Completion.ContinueWith(delegate { simulatedIngestBlock.Complete(); });
            }
            else // Input is in blobs
            {
                simulatedIngestBlock = new ActionBlock<DataSource>(
                    record => LogSingleObject(record),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });

                filterObjectsBlock = new ActionBlock<IPersistentStorageFile>(
                    file => FilterFilesAsync(file, m_patternRegex, m_objectsCountQuota, simulatedIngestBlock),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
                filterObjectsBlock.Completion.ContinueWith(delegate { simulatedIngestBlock.Complete(); });

                listObjectsBlock = new ActionBlock<string>(
                    sourcePath => ListFiles(sourcePath, m_args.Prefix, m_objectsCountQuota, filterObjectsBlock),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
                listObjectsBlock.Completion.ContinueWith(delegate { filterObjectsBlock.Complete(); });
            }

            // Debugging: Allow the debugger to retrieve the DataFlow blocks:
            Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(simulatedIngestBlock), simulatedIngestBlock);
            Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(filterObjectsBlock), filterObjectsBlock);
            Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(listObjectsBlock), listObjectsBlock);

            m_logger.LogWarning("*** NO DATA WILL BE INGESTED IN THIS RUN ***" + Environment.NewLine);
            listObjectsBlock.Post(m_args.SourcePath);
            listObjectsBlock.Complete();

            simulatedIngestBlock.Completion.Wait();
            stopwatch.Stop();

            m_logger.LogWarning(Environment.NewLine + "*** NO DATA WAS INGESTED IN THIS RUN ***" + Environment.NewLine);
            m_logger.LogSuccess($"    Done. Time elapsed: {stopwatch.Elapsed:c}");
            m_logger.LogSuccess($"    {BasicCountersSnapshot()}, accepted: [{Interlocked.Read(ref m_objectsAccepted),7}]");
        }