in Amazon.KinesisTap.FileSystem/AsyncDirectorySource.cs [454:524]
private async Task ProcessFile(TContext context, List<IEnvelope<TData>> envelopes, SourceMetrics sourceMetrics)
{
sourceMetrics.FilesToProcess += 1;
try
{
var fileLength = GetFileSize(context.FilePath);
if (context.Position == fileLength)
{
return;
}
else if (context.Position > fileLength)
{
//Other than malicious attack, the most likely scenario is file truncate so we will read from the beginning
context.Position = 0;
context.LineNumber = 0;
context.ConsecutiveIOExceptionCount = 0;
}
sourceMetrics.BytesToRead += fileLength - context.Position;
var previousPosition = context.Position;
_logger.LogDebug("Parsing file {0} from position {1}", context.FilePath, context.Position);
await _recordParser.ParseRecordsAsync(context, envelopes, _readBatchSize);
_logger.LogDebug("Read {0} records", envelopes.Count);
sourceMetrics.RecordsRead += envelopes.Count;
sourceMetrics.BytesRead += context.Position - previousPosition;
foreach (var envelope in envelopes)
{
if (InitialPosition == InitialPositionEnum.Timestamp
&& envelope.Timestamp.ToUniversalTime() < InitialPositionTimestamp)
{
continue;
}
if (_bookmarkOnSinkFlush && InitialPosition != InitialPositionEnum.EOS)
{
envelope.BookmarkData = new IntegerPositionRecordBookmark(BookmarkKey, context.FilePath, envelope.Position);
}
_recordSubject.OnNext(envelope);
}
if (!_bookmarkOnSinkFlush)
{
_bookmarkMap[context.FilePath] = context.Position;
}
context.ConsecutiveIOExceptionCount = 0;
}
catch (FileNotFoundException fnfe)
{
_logger.LogWarning(fnfe, "Could not find file '{0}'", context.FilePath);
// remove the non-existent file from the context map
_fileContextMap.Remove(context.FilePath, out _);
}
catch (IOException ioex)
{
context.ConsecutiveIOExceptionCount++;
if (context.ConsecutiveIOExceptionCount > _numberOfConsecutiveIOExceptionsToLogError)
{
_logger.LogError(ioex, "I/O error while processing file '{0}'", context.FilePath);
context.ConsecutiveIOExceptionCount = 0;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error while processing file '{0}'", context.FilePath);
}
}