in src/Ingestor.cs [450:503]
internal void RunIngestSimulation()
{
Reset();
var stopwatch = ExtendedStopwatch.StartNew();
ActionBlock<string> listObjectsBlock = null;
ActionBlock<IPersistentStorageFile> filterObjectsBlock = null;
ActionBlock<DataSource> simulatedIngestBlock = null;
if (m_bFileSystem) // Data is in local files
{
simulatedIngestBlock = new ActionBlock<DataSource>(
record => LogSingleObject(record),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
listObjectsBlock = new ActionBlock<string>(
sourcePath => ListAndFilterFiles(sourcePath, m_args.Pattern, m_objectsCountQuota, simulatedIngestBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
listObjectsBlock.Completion.ContinueWith(delegate { simulatedIngestBlock.Complete(); });
}
else // Input is in blobs
{
simulatedIngestBlock = new ActionBlock<DataSource>(
record => LogSingleObject(record),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
filterObjectsBlock = new ActionBlock<IPersistentStorageFile>(
file => FilterFilesAsync(file, m_patternRegex, m_objectsCountQuota, simulatedIngestBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
filterObjectsBlock.Completion.ContinueWith(delegate { simulatedIngestBlock.Complete(); });
listObjectsBlock = new ActionBlock<string>(
sourcePath => ListFiles(sourcePath, m_args.Prefix, m_objectsCountQuota, filterObjectsBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
listObjectsBlock.Completion.ContinueWith(delegate { filterObjectsBlock.Complete(); });
}
// Debugging: Allow the debugger to retrieve the DataFlow blocks:
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(simulatedIngestBlock), simulatedIngestBlock);
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(filterObjectsBlock), filterObjectsBlock);
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(listObjectsBlock), listObjectsBlock);
m_logger.LogWarning("*** NO DATA WILL BE INGESTED IN THIS RUN ***" + Environment.NewLine);
listObjectsBlock.Post(m_args.SourcePath);
listObjectsBlock.Complete();
simulatedIngestBlock.Completion.Wait();
stopwatch.Stop();
m_logger.LogWarning(Environment.NewLine + "*** NO DATA WAS INGESTED IN THIS RUN ***" + Environment.NewLine);
m_logger.LogSuccess($" Done. Time elapsed: {stopwatch.Elapsed:c}");
m_logger.LogSuccess($" {BasicCountersSnapshot()}, accepted: [{Interlocked.Read(ref m_objectsAccepted),7}]");
}