public async Task ParseRecordsAsync()

in Amazon.KinesisTap.FileSystem/RegexLogParser.cs [61:137]


        public async Task ParseRecordsAsync(RegexLogContext context, IList<IEnvelope<IDictionary<string, string>>> output,
            int recordCount, CancellationToken stopToken = default)
        {
            var count = 0;
            using (var stream = new FileStream(context.FilePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
            {
                stream.Position = context.Position;
                using (var reader = new LineReader(stream, _encoding, _bufferSize))
                {
                    while (count < recordCount)
                    {
                        stopToken.ThrowIfCancellationRequested();
                        var (line, consumed) = await reader.ReadAsync(stopToken);
                        _logger.LogTrace("File: '{0}', line: '{1}', bytes: {2}", context.FilePath, line, consumed);

                        if (line is null)
                        {
                            // end-of-file
                            var record = CreateRecord(context);
                            if (record is not null)
                            {
                                output.Add(record);
                            }

                            context.RecordBuilder.Clear();
                            context.MatchedLineNumber = -1;
                            context.MatchedLineTimestamp = null;
                            break;
                        }

                        context.LineNumber++;
                        context.Position += consumed;

                        var match = _patternRegex.Match(line);
                        if (match.Success)
                        {
                            _logger.LogDebug("Regex matched.");
                            // this is start of a new record, get the last record
                            var record = CreateRecord(context);
                            if (record is not null)
                            {
                                output.Add(record);
                                count++;
                            }
                            // remember the new record's first line
                            context.RecordBuilder.Clear();
                            context.RecordBuilder.Append(line);
                            context.MatchedLineNumber = context.LineNumber;
                            // if the matched line has a 'Timestamp' match group, use it as the record's timestamp
                            context.MatchedLineTimestamp = GetTimeStamp(match);
                        }
                        else
                        {
                            _logger.LogDebug("Regex NOT matched.");
                            // line is not the beginning of a new record
                            if (context.RecordBuilder.Length != 0)
                            {
                                // some lines have been added to the new record
                                context.RecordBuilder.AppendLine();
                                context.RecordBuilder.Append(line);
                            }
                            else if (_removeUnmatchedRecord)
                            {
                                _logger.LogWarning("Line discarded: {0}", line);
                            }
                            else
                            {
                                // start a new record
                                // TODO figure out what's the use case scenario for this
                                context.RecordBuilder.Append(line);
                                context.MatchedLineNumber = context.LineNumber;
                            }
                        }
                    }
                }
            }
        }