in src/Ingestor.cs [735:769]
private void ListFiles(string sourcePath, string sourceVirtualDirectory, int filesToTake, ITargetBlock<IPersistentStorageFile> targetBlock)
{
#if !OPEN_SOURCE_COMPILATION
try
{
EnableStorageUserAuthIfNeeded(ref sourcePath, out var authProvider);
IPersistentStorageContainer container = m_persistentStorageFactory.CreateContainerRef(sourcePath, credentialsProvider: authProvider);
m_logger.LogVerbose($"ListFiles: enumerating files under container '{sourcePath.SplitFirst(";").SplitFirst("?")}' with prefix '{sourceVirtualDirectory}'");
if (filesToTake >= 0 && filesToTake <= Interlocked.Read(ref m_objectsAccepted))
{
return;
}
var sourceFiles = container.EnumerateFiles(
pattern: sourceVirtualDirectory + "*", withMetadata: true
);
ExtendedParallel.ForEach(sourceFiles, m_directIngestParallelRequests, r =>
{
while (!m_listingFixedWindowThrottlerPolicy.ShouldInvoke())
{
Task.Delay(TimeSpan.FromMilliseconds(c_delayOnThrottlingMs)).ConfigureAwait(false).ResultEx();
}
targetBlock.SendAsync(r).ConfigureAwait(false).ResultEx();
Interlocked.Increment(ref m_objectsListed);
});
}
catch (Exception ex)
{
m_logger.LogError($"Error: ListFiles failed: {ex.MessageEx(true)}");
}
#endif
}