int AlignedChunkReader::decode_cur_value_page_data()

in cpp/src/reader/aligned_chunk_reader.cc [409:481]


int AlignedChunkReader::decode_cur_value_page_data() {
    int ret = E_OK;

    // Step 1: make sure we load the whole page data in @in_stream_
    if (value_in_stream_.remaining_size() <
        cur_value_page_header_.compressed_size_) {
        // std::cout << "decode_cur_page_data. in_stream_.remaining_size="<<
        // in_stream_.remaining_size() << ", cur_page_header_.compressed_size_="
        // << cur_page_header_.compressed_size_ << std::endl;
        if (RET_FAIL(read_from_file_and_rewrap(
                value_in_stream_, value_chunk_meta_, value_chunk_visit_offset_,
                file_data_value_buf_size_,
                cur_value_page_header_.compressed_size_))) {
        }
    }

    char *value_compressed_buf = nullptr;
    char *value_uncompressed_buf = nullptr;
    uint32_t value_compressed_buf_size = 0;
    uint32_t value_uncompressed_buf_size = 0;
    char *value_buf = nullptr;
    uint32_t value_buf_size = 0;

    // Step 2: do uncompress
    if (IS_SUCC(ret)) {
        value_compressed_buf =
            value_in_stream_.get_wrapped_buf() + value_in_stream_.read_pos();
        value_compressed_buf_size = cur_value_page_header_.compressed_size_;
        value_in_stream_.wrapped_buf_advance_read_pos(
            value_compressed_buf_size);
        value_chunk_visit_offset_ += value_compressed_buf_size;
        if (RET_FAIL(value_compressor_->reset(false))) {
        } else if (RET_FAIL(value_compressor_->uncompress(
                       value_compressed_buf, value_compressed_buf_size,
                       value_uncompressed_buf, value_uncompressed_buf_size))) {
        } else {
            value_uncompressed_buf_ = value_uncompressed_buf;
        }
#ifdef DEBUG_SE
        DEBUG_hex_dump_buf(
            "AlignedChunkReader reader, value_uncompressed buf = ",
            value_uncompressed_buf, value_uncompressed_buf_size);
#endif
        if (ret != E_OK || value_uncompressed_buf_size !=
                               cur_value_page_header_.uncompressed_size_) {
            ret = E_TSFILE_CORRUPTED;
            ASSERT(false);
        }
    }
    // Step 3: get value_buf
    if (IS_SUCC(ret)) {
        uint32_t value_uncompressed_buf_offset = 0;
        value_page_data_num_ =
            SerializationUtil::read_ui32(value_uncompressed_buf);
        value_uncompressed_buf_offset += sizeof(uint32_t);
        value_page_col_notnull_bitmap_.resize((value_page_data_num_ + 7) / 8);
        for (unsigned char &i : value_page_col_notnull_bitmap_) {
            i = *(value_uncompressed_buf + value_uncompressed_buf_offset);
            value_uncompressed_buf_offset++;
        }
        cur_value_index = -1;
        value_buf = value_uncompressed_buf + value_uncompressed_buf_offset;
        value_buf_size =
            value_uncompressed_buf_size - value_uncompressed_buf_offset;
    }
    value_decoder_->reset();
#ifdef DEBUG_SE
    DEBUG_hex_dump_buf("AlignedChunkReader reader, value_buf = ", value_buf,
                       value_buf_size);
#endif
    value_in_.wrap_from(value_buf, value_buf_size);
    return ret;
}