in src/Ingestor.cs [771:818]
private async Task FilterFilesAsync(IPersistentStorageFile cloudFile, Regex patternRegex, int filesToTake, ITargetBlock<DataSource> targetBlock)
{
try
{
if (cloudFile != null && (patternRegex == null || patternRegex.IsMatch(cloudFile.GetFileName())))
{
try
{
// Semaphore is used in order to not process more items than specified, if not specified - it is null.
m_listingLockOrNull?.Wait();
if (filesToTake >= 0 && filesToTake <= Interlocked.Read(ref m_objectsAccepted))
{
// We're done, don't need new stuff
return;
}
var res = await RetryOperation.RetryAsync<Exception, (long, DateTime?)>(
maxNumberOfTries: 3,
waitInterval: TimeSpan.FromSeconds(10),
description: "Fetch file metadata",
async () =>
{
long size = await Utilities.EstimateFileSizeAsync(cloudFile, m_estimatedCompressionRatio);
DateTime? creationTime = await Utilities.InferFileCreationTimeUtcAsync(cloudFile, m_creationTimeInNamePattern);
return (size, creationTime);
});
await targetBlock.SendAsync(new DataSource
{
CloudFileUri = $"{cloudFile.GetUnsecureUri()}",
SafeCloudFileUri = cloudFile.GetFileUri(),
SizeInBytes = res.Item1,
CreationTimeUtc = res.Item2
});
Interlocked.Increment(ref m_objectsAccepted);
}
finally
{
m_listingLockOrNull?.Release();
}
}
}
catch (Exception ex)
{
m_logger.LogError($"FilterFilesAsync failed on blob '{cloudFile.GetFileUri()}', error: {ex.Message}");
}
}