in Amazon.KinesisTap.FileSystem/AsyncDelimitedLogParserBase.cs [47:126]
public async Task ParseRecordsAsync(DelimitedTextLogContext context, IList<IEnvelope<TData>> output,
int recordCount, CancellationToken stopToken = default)
{
if (context.Fields is null)
{
context.Fields = await TryGetHeaderFields(context, stopToken);
}
var count = 0;
using (var stream = new FileStream(context.FilePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
stream.Position = context.Position;
using (var reader = new LineReader(stream, _encoding, _bufferSize))
{
while (count < recordCount)
{
stopToken.ThrowIfCancellationRequested();
var (line, consumed) = await reader.ReadAsync(stopToken);
_logger.LogTrace("File: '{0}', line: '{1}', bytes: {2}", context.FilePath, line, consumed);
context.Position += consumed;
if (line is null)
{
break;
}
if (ShouldStopAndRollback(line, context))
{
context.Position -= consumed;
return;
}
context.LineNumber++;
if (IsHeaders(line, context.LineNumber))
{
context.Fields = ParseHeadersLine(line);
continue;
}
else if (IsComment(line))
{
continue;
}
try
{
// 'ParseDataFragments' and 'CreateRecord' might throw error, so we need to catch it and skip the record
var fragments = ParseDataFragments(line);
if (context.Fields is null)
{
_logger.LogWarning("Unknown field mapping, skipping line {0}", context.LineNumber);
continue;
}
var dict = new Dictionary<string, string>();
for (var i = 0; i < context.Fields.Length; i++)
{
if (i >= fragments.Length)
{
break;
}
var (key, val) = KeyValueSelector(context.Fields[i], fragments[i]);
dict[key] = val;
}
var record = CreateRecord(context, dict);
var envelope = new LogEnvelope<TData>(record, record.Timestamp, line, context.FilePath, context.Position, context.LineNumber);
output.Add(envelope);
count++;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing record '{0}'", line);
continue;
}
}
}
}
}