private async Task FilterFilesAsync()

in src/Ingestor.cs [771:818]


        private async Task FilterFilesAsync(IPersistentStorageFile cloudFile, Regex patternRegex, int filesToTake, ITargetBlock<DataSource> targetBlock)
        {
            try
            {
                if (cloudFile != null && (patternRegex == null || patternRegex.IsMatch(cloudFile.GetFileName())))
                {
                    try
                    {
                        // Semaphore is used in order to not process more items than specified, if not specified - it is null.
                        m_listingLockOrNull?.Wait();

                        if (filesToTake >= 0 && filesToTake <= Interlocked.Read(ref m_objectsAccepted))
                        {
                            // We're done, don't need new stuff
                            return;
                        }

                        var res = await RetryOperation.RetryAsync<Exception, (long, DateTime?)>(
                            maxNumberOfTries: 3,
                            waitInterval: TimeSpan.FromSeconds(10),
                            description: "Fetch file metadata",
                            async () =>
                            {
                                long size = await Utilities.EstimateFileSizeAsync(cloudFile, m_estimatedCompressionRatio);
                                DateTime? creationTime = await Utilities.InferFileCreationTimeUtcAsync(cloudFile, m_creationTimeInNamePattern);
                                return (size, creationTime);
                            });
                        await targetBlock.SendAsync(new DataSource
                        {
                            CloudFileUri = $"{cloudFile.GetUnsecureUri()}",
                            SafeCloudFileUri = cloudFile.GetFileUri(),
                            SizeInBytes = res.Item1,
                            CreationTimeUtc = res.Item2
                        });
                        Interlocked.Increment(ref m_objectsAccepted);
                    }
                    finally
                    {
                        m_listingLockOrNull?.Release();
                    }
                }
            }
            catch (Exception ex)
            {
                m_logger.LogError($"FilterFilesAsync failed on blob '{cloudFile.GetFileUri()}', error: {ex.Message}");
            }

        }