private async Task ProcessFile()

in Amazon.KinesisTap.FileSystem/AsyncDirectorySource.cs [454:524]


        private async Task ProcessFile(TContext context, List<IEnvelope<TData>> envelopes, SourceMetrics sourceMetrics)
        {
            sourceMetrics.FilesToProcess += 1;
            try
            {
                var fileLength = GetFileSize(context.FilePath);

                if (context.Position == fileLength)
                {
                    return;
                }
                else if (context.Position > fileLength)
                {
                    //Other than malicious attack, the most likely scenario is file truncate so we will read from the beginning
                    context.Position = 0;
                    context.LineNumber = 0;
                    context.ConsecutiveIOExceptionCount = 0;
                }
                sourceMetrics.BytesToRead += fileLength - context.Position;
                var previousPosition = context.Position;

                _logger.LogDebug("Parsing file {0} from position {1}", context.FilePath, context.Position);
                await _recordParser.ParseRecordsAsync(context, envelopes, _readBatchSize);
                _logger.LogDebug("Read {0} records", envelopes.Count);

                sourceMetrics.RecordsRead += envelopes.Count;
                sourceMetrics.BytesRead += context.Position - previousPosition;

                foreach (var envelope in envelopes)
                {
                    if (InitialPosition == InitialPositionEnum.Timestamp
                        && envelope.Timestamp.ToUniversalTime() < InitialPositionTimestamp)
                    {
                        continue;
                    }

                    if (_bookmarkOnSinkFlush && InitialPosition != InitialPositionEnum.EOS)
                    {
                        envelope.BookmarkData = new IntegerPositionRecordBookmark(BookmarkKey, context.FilePath, envelope.Position);
                    }
                    _recordSubject.OnNext(envelope);
                }

                if (!_bookmarkOnSinkFlush)
                {
                    _bookmarkMap[context.FilePath] = context.Position;
                }

                context.ConsecutiveIOExceptionCount = 0;
            }
            catch (FileNotFoundException fnfe)
            {
                _logger.LogWarning(fnfe, "Could not find file '{0}'", context.FilePath);

                // remove the non-existent file from the context map
                _fileContextMap.Remove(context.FilePath, out _);
            }
            catch (IOException ioex)
            {
                context.ConsecutiveIOExceptionCount++;
                if (context.ConsecutiveIOExceptionCount > _numberOfConsecutiveIOExceptionsToLogError)
                {
                    _logger.LogError(ioex, "I/O error while processing file '{0}'", context.FilePath);
                    context.ConsecutiveIOExceptionCount = 0;
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error while processing file '{0}'", context.FilePath);
            }
        }