protected async ValueTask ParseAsyncInternal()

in Amazon.KinesisTap.FileSystem/AbstractLineProcessor.cs [115:186]


        protected async ValueTask<(int lineStartIdx, int lineSize, int consumed)> ParseAsyncInternal(CancellationToken stopToken = default)
        {
            var initialStreamPosition = _stream.Position;
            var readNextResult = ReadNextLine(out var consumed);
            _startPos += consumed;
            _len -= consumed;

            if (readNextResult.lineSize >= 0)
            {
                return (readNextResult.lineStartIdx, readNextResult.lineSize, consumed);
            }

            int bytesRead;
            do
            {
                AdjustBuffer();
                bytesRead = await _stream.ReadAsync(_buffer.AsMemory(_startPos + _len, _blockSize), stopToken);
                _len += bytesRead;

                if (bytesRead == 0)
                {
                    break;
                }

                int consumedBytes;
                if (initialStreamPosition == 0)
                {
                    if (CurrentEncoding is null)
                    {
                        CurrentEncoding = DetectEncoding(out consumedBytes);
                        // continue even if encoding is not detected, in this case we assume UTF-8
                        consumed += consumedBytes;
                        _startPos += consumedBytes;
                        _len -= consumedBytes;
                        DetermineUnitSizeAndEndianess();
                    }
                    else
                    {
                        // this means encoding is dictated by user
                        var detectedEncoding = DetectEncoding(out consumedBytes);
                        if (detectedEncoding == null)
                        {
                            // no encoding found yet, try reading later
                            break;
                        }
                        if (consumedBytes > 0)
                        {
                            // this means we have found the preamble,
                            // override the user-specified encoding
                            CurrentEncoding = detectedEncoding;
                            consumed += consumedBytes;
                            _startPos += consumedBytes;
                            _len -= consumedBytes;
                            DetermineUnitSizeAndEndianess();
                        }
                    }
                }

                readNextResult = ReadNextLine(out consumedBytes);
                _startPos += consumedBytes;
                _len -= consumedBytes;
                consumed += consumedBytes;

                if (readNextResult.lineSize >= 0)
                {
                    return (readNextResult.lineStartIdx, readNextResult.lineSize, consumed);
                }

            } while (bytesRead > 0);

            return (_startPos, -1, consumed);
        }