in cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp [225:323]
bool SplittableBzip2ReadBuffer::nextImpl()
{
const Position dest = internal_buffer.begin();
const size_t dest_size = internal_buffer.size();
size_t offset = 0;
if (last_block_need_special_process && !last_incomplete_line.empty())
{
/// If we have last incomplete line, append it to the beginning of internal buffer
memcpy(dest, last_incomplete_line.data(), last_incomplete_line.size());
offset += last_incomplete_line.size();
last_incomplete_line.clear();
}
Int32 result;
do
{
result = read(dest, dest_size, offset, dest_size - offset);
if (result > 0)
offset += result;
else if (first_block_need_special_process && result == BZip2Constants::END_OF_BLOCK && is_first_block)
{
/// Special processing for the first block
/// Notice that row delim could be \n (Unix) or \r\n (DOS/Windows) or \n\r (Mac OS Classic)
is_first_block = false;
Position end = dest + offset;
auto * pos = find_last_symbols_or_null<'\n'>(dest, end);
if (pos)
{
if (pos == end - 1 || (pos == end - 2 && *(pos + 1) == '\r'))
{
/// The last row ends with \n or \r\n or \n\r, discard all lines in internal buffer
offset = 0;
}
else
{
/// The last row does not end with \n or \r\n or \n\r, rewrite the last row to internal buffer
Position last_line = pos + 1;
size_t last_line_size = end - pos - 1;
if (*(pos + 1) == '\r')
last_line_size--;
memmove(dest, last_line, last_line_size);
offset = last_line_size;
}
}
LOG_DEBUG(
getLogger("SplittableBzip2ReadBuffer"),
"Header of first block after special processed:{}",
std::string(dest, std::min(offset, 100UL)));
}
} while (result != BZip2Constants::END_OF_STREAM && offset < dest_size);
if (last_block_need_special_process && offset)
{
/// Trim the last incomplete line from [dest, dest+offset), and record it in last_incomplete_line
bool reach_eof = (result == BZip2Constants::END_OF_STREAM);
if (reach_eof)
{
LOG_DEBUG(
getLogger("SplittableBzip2ReadBuffer"),
"Header of last block before special processed:{}",
std::string(dest, std::min(offset, 100UL)));
}
/// Trim the last incomplete line from [dest, dest+offset), and record it in last_incomplete_line
Position end = dest + offset;
auto * pos = find_last_symbols_or_null<'\n'>(dest, end);
if (!pos)
{
if (reach_eof)
offset = 0;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find row delimiter in working buffer with size:{}", offset);
}
else
{
/// Discard the last incomplete row(if has), and record it in last_incomplete_line
size_t old_offset = offset;
offset = pos - dest + 1;
if (pos + 1 < end && *(pos + 1) == '\r')
offset++;
if (!reach_eof)
{
/// Only record last incomplete line when eof not reached
last_incomplete_line.assign(&dest[offset], old_offset - offset);
}
}
}
if (offset)
{
working_buffer.resize(offset);
return true;
}
else
return false;
}