in src/Ingestor.cs [506:564]
private void RunPrepareBatchesForDirectIngest(ICslAdminProvider kustoClient, bool kustoRunningLocally)
{
ActionBlock<string> listObjectsBlock = null;
ActionBlock<IPersistentStorageFile> filterObjectsBlock = null;
ActionBlock<DataSource> uploadOrAccumulateBlock = null;
IPersistentStorageContainer tempContainer = null;
if (m_bFileSystem) // Data is in local files
{
// We only need to upload to a real blob container is if we are *not* working with a local Kusto service
if (!kustoRunningLocally)
{
tempContainer = AcquireTempBlobContainer(kustoClient);
}
uploadOrAccumulateBlock = new ActionBlock<DataSource>(
record => UploadFiles(record, tempContainer),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
listObjectsBlock = new ActionBlock<string>(
sourcePath => ListAndFilterFiles(sourcePath, m_args.Pattern, m_objectsCountQuota, uploadOrAccumulateBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
listObjectsBlock.Completion.ContinueWith(delegate { uploadOrAccumulateBlock.Complete(); });
}
else // Input is not local files
{
uploadOrAccumulateBlock = new ActionBlock<DataSource>(
record => AccumulateObjects(record),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });
filterObjectsBlock = new ActionBlock<IPersistentStorageFile>(
file => FilterFilesAsync(file, m_patternRegex, m_objectsCountQuota, uploadOrAccumulateBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });
filterObjectsBlock.Completion.ContinueWith(delegate { uploadOrAccumulateBlock.Complete(); });
listObjectsBlock = new ActionBlock<string>(
sourcePath => ListFiles(sourcePath, m_args.Prefix, m_objectsCountQuota, filterObjectsBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
listObjectsBlock.Completion.ContinueWith(delegate { filterObjectsBlock.Complete(); });
}
// Debugging: Allow the debugger to retrieve the DataFlow blocks:
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(listObjectsBlock), listObjectsBlock);
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(filterObjectsBlock), filterObjectsBlock);
Kusto.Cloud.Platform.Debugging.RegisterWeakReference(nameof(uploadOrAccumulateBlock), uploadOrAccumulateBlock);
m_logger.LogVerbose("==> Flow RunPrepareBatchesForDirectIngest starting...");
listObjectsBlock.Post(m_args.SourcePath);
listObjectsBlock.Complete();
bool bPipelineCompleted = false;
do
{
bPipelineCompleted = uploadOrAccumulateBlock.Completion.Wait(TimeSpan.FromSeconds(10));
m_logger.LogInfo($"==> {DirectIngestStats()}");
} while (!bPipelineCompleted);
m_logger.LogVerbose("==> Flow RunPrepareBatchesForDirectIngest done.");
}