in Amazon.KinesisTap.FileSystem/AbstractLineProcessor.cs [115:186]
protected async ValueTask<(int lineStartIdx, int lineSize, int consumed)> ParseAsyncInternal(CancellationToken stopToken = default)
{
var initialStreamPosition = _stream.Position;
var readNextResult = ReadNextLine(out var consumed);
_startPos += consumed;
_len -= consumed;
if (readNextResult.lineSize >= 0)
{
return (readNextResult.lineStartIdx, readNextResult.lineSize, consumed);
}
int bytesRead;
do
{
AdjustBuffer();
bytesRead = await _stream.ReadAsync(_buffer.AsMemory(_startPos + _len, _blockSize), stopToken);
_len += bytesRead;
if (bytesRead == 0)
{
break;
}
int consumedBytes;
if (initialStreamPosition == 0)
{
if (CurrentEncoding is null)
{
CurrentEncoding = DetectEncoding(out consumedBytes);
// continue even if encoding is not detected, in this case we assume UTF-8
consumed += consumedBytes;
_startPos += consumedBytes;
_len -= consumedBytes;
DetermineUnitSizeAndEndianess();
}
else
{
// this means encoding is dictated by user
var detectedEncoding = DetectEncoding(out consumedBytes);
if (detectedEncoding == null)
{
// no encoding found yet, try reading later
break;
}
if (consumedBytes > 0)
{
// this means we have found the preamble,
// override the user-specified encoding
CurrentEncoding = detectedEncoding;
consumed += consumedBytes;
_startPos += consumedBytes;
_len -= consumedBytes;
DetermineUnitSizeAndEndianess();
}
}
}
readNextResult = ReadNextLine(out consumedBytes);
_startPos += consumedBytes;
_len -= consumedBytes;
consumed += consumedBytes;
if (readNextResult.lineSize >= 0)
{
return (readNextResult.lineStartIdx, readNextResult.lineSize, consumed);
}
} while (bytesRead > 0);
return (_startPos, -1, consumed);
}