int ChunkReader::decode_cur_page_data()

in cpp/src/reader/chunk_reader.cc [265:363]


int ChunkReader::decode_cur_page_data(TsBlock *&ret_tsblock, Filter *filter,
                                      PageArena &pa) {
    int ret = E_OK;

    // Step 1: make sure we load the whole page data in @in_stream_
    if (in_stream_.remaining_size() < cur_page_header_.compressed_size_) {
        // std::cout << "decode_cur_page_data. in_stream_.remaining_size="<<
        // in_stream_.remaining_size() << ", cur_page_header_.compressed_size_="
        // << cur_page_header_.compressed_size_ << std::endl;
        if (RET_FAIL(
                read_from_file_and_rewrap(cur_page_header_.compressed_size_))) {
        }
    }

    char *compressed_buf = nullptr;
    char *uncompressed_buf = nullptr;
    uint32_t compressed_buf_size = 0;  // cppcheck-suppress unreadVariable
    uint32_t uncompressed_buf_size = 0;
    char *time_buf = nullptr;
    char *value_buf = nullptr;
    uint32_t time_buf_size = 0;
    uint32_t value_buf_size = 0;

    // Step 2: do uncompress
    if (IS_SUCC(ret)) {
        compressed_buf = in_stream_.get_wrapped_buf() + in_stream_.read_pos();
        // std::cout << "ChunkReader::decode_cur_page_data,
        // in_stream_.get_wrapped_buf="
        // <<(void*)(in_stream_.get_wrapped_buf())<< ", in_stream_.read_pos=" <<
        // in_stream_.read_pos() << std::endl;
        compressed_buf_size = cur_page_header_.compressed_size_;
        in_stream_.wrapped_buf_advance_read_pos(compressed_buf_size);
        chunk_visit_offset_ += compressed_buf_size;
        if (RET_FAIL(compressor_->reset(false))) {
        } else if (RET_FAIL(compressor_->uncompress(
                       compressed_buf, compressed_buf_size, uncompressed_buf,
                       uncompressed_buf_size))) {
        } else {
            uncompressed_buf_ = uncompressed_buf;
        }
        // DEBUG_hex_dump_buf("ChunkReader reader, uncompressed buf = ",
        // uncompressed_buf, uncompressed_buf_size);
        if (ret != E_OK ||
            uncompressed_buf_size != cur_page_header_.uncompressed_size_) {
            ret = E_TSFILE_CORRUPTED;
            ASSERT(false);
        }
    }

    // Step 3: get time_buf & value_buf
    if (IS_SUCC(ret)) {
        int var_size = 0;
        if (RET_FAIL(SerializationUtil::read_var_uint(
                time_buf_size, uncompressed_buf, uncompressed_buf_size,
                &var_size))) {
        } else {
            time_buf = uncompressed_buf + var_size;
            value_buf = time_buf + time_buf_size;
            value_buf_size = uncompressed_buf_size - var_size - time_buf_size;
#if DEBUG_SE
            std::cout << "ChunkReader uncompress: compressed_buf_size="
                      << compressed_buf_size
                      << ", uncompressed_buf_size=" << uncompressed_buf_size
                      << ", var_size=" << var_size
                      << ", time_buf_size=" << time_buf_size << std::endl;
#endif
            if (uncompressed_buf_size <= var_size + time_buf_size) {
                ret = E_TSFILE_CORRUPTED;
                ASSERT(false);
            }
        }
    }

    // Step 4: decode time-value buffer into @ret_tsblock
    if (IS_SUCC(ret)) {
        time_decoder_->reset();
        value_decoder_->reset();
        time_in_.wrap_from(time_buf, time_buf_size);
        value_in_.wrap_from(value_buf, value_buf_size);
        // ret = decode_tv_buf_into_tsblock(time_buf, value_buf, time_buf_size,
        //                                  value_buf_size, ret_tsblock,
        //                                  filter);
        ret = decode_tv_buf_into_tsblock_by_datatype(time_in_, value_in_,
                                                     ret_tsblock, filter, &pa);
        // if we return during @decode_tv_buf_into_tsblock, we should keep
        // @uncompressed_buf_ valid until all TV pairs are decoded.
        if (ret != E_OVERFLOW) {
            if (uncompressed_buf_ != nullptr) {
                compressor_->after_uncompress(uncompressed_buf_);
                uncompressed_buf_ = nullptr;
            }
            time_in_.reset();
            value_in_.reset();
        } else {
            ret = E_OK;
        }
    }
    return ret;
}