bool SplittableBzip2ReadBuffer::nextImpl()

in cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp [225:323]


bool SplittableBzip2ReadBuffer::nextImpl()
{
    const Position dest = internal_buffer.begin();
    const size_t dest_size = internal_buffer.size();
    size_t offset = 0;

    if (last_block_need_special_process && !last_incomplete_line.empty())
    {
        /// If we have last incomplete line, append it to the beginning of internal buffer
        memcpy(dest, last_incomplete_line.data(), last_incomplete_line.size());
        offset += last_incomplete_line.size();
        last_incomplete_line.clear();
    }

    Int32 result;
    do
    {
        result = read(dest, dest_size, offset, dest_size - offset);
        if (result > 0)
            offset += result;
        else if (first_block_need_special_process && result == BZip2Constants::END_OF_BLOCK && is_first_block)
        {
            /// Special processing for the first block
            /// Notice that row delim could be \n (Unix) or \r\n (DOS/Windows) or \n\r (Mac OS Classic)
            is_first_block = false;
            Position end = dest + offset;
            auto * pos = find_last_symbols_or_null<'\n'>(dest, end);
            if (pos)
            {
                if (pos == end - 1 || (pos == end - 2 && *(pos + 1) == '\r'))
                {
                    /// The last row ends with \n or \r\n or \n\r, discard all lines in internal buffer
                    offset = 0;
                }
                else
                {
                    /// The last row does not end with \n or \r\n or \n\r, rewrite the last row to internal buffer
                    Position last_line = pos + 1;
                    size_t last_line_size = end - pos - 1;
                    if (*(pos + 1) == '\r')
                        last_line_size--;

                    memmove(dest, last_line, last_line_size);
                    offset = last_line_size;
                }
            }
            LOG_DEBUG(
                getLogger("SplittableBzip2ReadBuffer"),
                "Header of first block after special processed:{}",
                std::string(dest, std::min(offset, 100UL)));
        }
    } while (result != BZip2Constants::END_OF_STREAM && offset < dest_size);

    if (last_block_need_special_process && offset)
    {
        /// Trim the last incomplete line from [dest, dest+offset), and record it in last_incomplete_line
        bool reach_eof = (result == BZip2Constants::END_OF_STREAM);
        if (reach_eof)
        {
            LOG_DEBUG(
                getLogger("SplittableBzip2ReadBuffer"),
                "Header of last block before special processed:{}",
                std::string(dest, std::min(offset, 100UL)));
        }

        /// Trim the last incomplete line from [dest, dest+offset), and record it in last_incomplete_line
        Position end = dest + offset;
        auto * pos = find_last_symbols_or_null<'\n'>(dest, end);
        if (!pos)
        {
            if (reach_eof)
                offset = 0;
            else
                throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find row delimiter in working buffer with size:{}", offset);
        }
        else
        {
            /// Discard the last incomplete row(if has), and record it in last_incomplete_line
            size_t old_offset = offset;
            offset = pos - dest + 1;
            if (pos + 1 < end && *(pos + 1) == '\r')
                offset++;

            if (!reach_eof)
            {
                /// Only record last incomplete line when eof not reached
                last_incomplete_line.assign(&dest[offset], old_offset - offset);
            }
        }
    }

    if (offset)
    {
        working_buffer.resize(offset);
        return true;
    }
    else
        return false;
}