in Amazon.KinesisTap.FileSystem/RegexLogParser.cs [61:137]
public async Task ParseRecordsAsync(RegexLogContext context, IList<IEnvelope<IDictionary<string, string>>> output,
int recordCount, CancellationToken stopToken = default)
{
var count = 0;
using (var stream = new FileStream(context.FilePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
stream.Position = context.Position;
using (var reader = new LineReader(stream, _encoding, _bufferSize))
{
while (count < recordCount)
{
stopToken.ThrowIfCancellationRequested();
var (line, consumed) = await reader.ReadAsync(stopToken);
_logger.LogTrace("File: '{0}', line: '{1}', bytes: {2}", context.FilePath, line, consumed);
if (line is null)
{
// end-of-file
var record = CreateRecord(context);
if (record is not null)
{
output.Add(record);
}
context.RecordBuilder.Clear();
context.MatchedLineNumber = -1;
context.MatchedLineTimestamp = null;
break;
}
context.LineNumber++;
context.Position += consumed;
var match = _patternRegex.Match(line);
if (match.Success)
{
_logger.LogDebug("Regex matched.");
// this is start of a new record, get the last record
var record = CreateRecord(context);
if (record is not null)
{
output.Add(record);
count++;
}
// remember the new record's first line
context.RecordBuilder.Clear();
context.RecordBuilder.Append(line);
context.MatchedLineNumber = context.LineNumber;
// if the matched line has a 'Timestamp' match group, use it as the record's timestamp
context.MatchedLineTimestamp = GetTimeStamp(match);
}
else
{
_logger.LogDebug("Regex NOT matched.");
// line is not the beginning of a new record
if (context.RecordBuilder.Length != 0)
{
// some lines have been added to the new record
context.RecordBuilder.AppendLine();
context.RecordBuilder.Append(line);
}
else if (_removeUnmatchedRecord)
{
_logger.LogWarning("Line discarded: {0}", line);
}
else
{
// start a new record
// TODO figure out what's the use case scenario for this
context.RecordBuilder.Append(line);
context.MatchedLineNumber = context.LineNumber;
}
}
}
}
}
}