in src/Ingestor.cs [302:372]
internal void RunQueuedIngest(KustoConnectionStringBuilder kcsb)
{
Reset();
var stopwatch = ExtendedStopwatch.StartNew();
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(kcsb))
{
ActionBlock<string> listObjectsBlock = null;
ActionBlock<IPersistentStorageFile> filterObjectsBlock = null;
ActionBlock<DataSource> ingestBlock = null;
if (m_bFileSystem) // Data is in local files
{
ingestBlock = new ActionBlock<DataSource>(
record => IngestSingle(record, m_objectsCountQuota, ingestClient, m_bFileSystem, false, m_ingestionProperties, m_ingestWithManagedIdentity),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
listObjectsBlock = new ActionBlock<string>(
sourcePath => ListAndFilterFiles(sourcePath, m_args.Pattern, m_objectsCountQuota, ingestBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
listObjectsBlock.Completion.ContinueWith(delegate { ingestBlock.Complete(); });
}
else // Input is in blobs
{
ingestBlock = new ActionBlock<DataSource>(
record => IngestSingle(record, m_objectsCountQuota, ingestClient, m_bFileSystem, false, m_ingestionProperties, m_ingestWithManagedIdentity),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });
// ListFiles calls PSL EnumerateFiles which accepts a pattern but BlobPersistentStorageFactory2 doesn't use the full pattern
// but only its prefix, therefore we still have to filter ourselves.
filterObjectsBlock = new ActionBlock<IPersistentStorageFile>(
file => FilterFilesAsync(file, m_patternRegex, m_objectsCountQuota, ingestBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });
filterObjectsBlock.Completion.ContinueWith(delegate { ingestBlock.Complete(); });
listObjectsBlock = new ActionBlock<string>(
sourcePath => ListFiles(sourcePath, m_args.Prefix, m_objectsCountQuota, filterObjectsBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
listObjectsBlock.Completion.ContinueWith(delegate { filterObjectsBlock.Complete(); });
}
// Debugging: Allow the debugger to retrieve the DataFlow blocks:
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(ingestBlock), ingestBlock);
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(filterObjectsBlock), filterObjectsBlock);
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(listObjectsBlock), listObjectsBlock);
m_logger.LogVerbose("==> Starting...");
listObjectsBlock.Post(m_args.SourcePath);
listObjectsBlock.Complete();
bool bPipelineCompleted = false;
do
{
bPipelineCompleted = ingestBlock.Completion.Wait(TimeSpan.FromSeconds(10));
m_logger.LogInfo($"==> {QueuedIngestStats()}");
} while (!bPipelineCompleted);
stopwatch.Stop();
}
m_logger.LogSuccess($" Done. Time elapsed: {stopwatch.Elapsed:c}");
m_logger.LogSuccess($" {QueuedIngestStats()}");
// Wait for ingestion completion, if required
if (m_bWaitForIngestCompletion && m_ingestionResults.SafeFastAny())
{
m_logger.LogInfo("==> Waiting for ingestion completion...");
WaitForIngestionResult(m_ingestionResults, m_ingestCompletionTimeout);
m_logger.LogInfo("==> Done.");
}
}