in cpp/src/reader/chunk_reader.cc [265:363]
int ChunkReader::decode_cur_page_data(TsBlock *&ret_tsblock, Filter *filter,
PageArena &pa) {
int ret = E_OK;
// Step 1: make sure we load the whole page data in @in_stream_
if (in_stream_.remaining_size() < cur_page_header_.compressed_size_) {
// std::cout << "decode_cur_page_data. in_stream_.remaining_size="<<
// in_stream_.remaining_size() << ", cur_page_header_.compressed_size_="
// << cur_page_header_.compressed_size_ << std::endl;
if (RET_FAIL(
read_from_file_and_rewrap(cur_page_header_.compressed_size_))) {
}
}
char *compressed_buf = nullptr;
char *uncompressed_buf = nullptr;
uint32_t compressed_buf_size = 0; // cppcheck-suppress unreadVariable
uint32_t uncompressed_buf_size = 0;
char *time_buf = nullptr;
char *value_buf = nullptr;
uint32_t time_buf_size = 0;
uint32_t value_buf_size = 0;
// Step 2: do uncompress
if (IS_SUCC(ret)) {
compressed_buf = in_stream_.get_wrapped_buf() + in_stream_.read_pos();
// std::cout << "ChunkReader::decode_cur_page_data,
// in_stream_.get_wrapped_buf="
// <<(void*)(in_stream_.get_wrapped_buf())<< ", in_stream_.read_pos=" <<
// in_stream_.read_pos() << std::endl;
compressed_buf_size = cur_page_header_.compressed_size_;
in_stream_.wrapped_buf_advance_read_pos(compressed_buf_size);
chunk_visit_offset_ += compressed_buf_size;
if (RET_FAIL(compressor_->reset(false))) {
} else if (RET_FAIL(compressor_->uncompress(
compressed_buf, compressed_buf_size, uncompressed_buf,
uncompressed_buf_size))) {
} else {
uncompressed_buf_ = uncompressed_buf;
}
// DEBUG_hex_dump_buf("ChunkReader reader, uncompressed buf = ",
// uncompressed_buf, uncompressed_buf_size);
if (ret != E_OK ||
uncompressed_buf_size != cur_page_header_.uncompressed_size_) {
ret = E_TSFILE_CORRUPTED;
ASSERT(false);
}
}
// Step 3: get time_buf & value_buf
if (IS_SUCC(ret)) {
int var_size = 0;
if (RET_FAIL(SerializationUtil::read_var_uint(
time_buf_size, uncompressed_buf, uncompressed_buf_size,
&var_size))) {
} else {
time_buf = uncompressed_buf + var_size;
value_buf = time_buf + time_buf_size;
value_buf_size = uncompressed_buf_size - var_size - time_buf_size;
#if DEBUG_SE
std::cout << "ChunkReader uncompress: compressed_buf_size="
<< compressed_buf_size
<< ", uncompressed_buf_size=" << uncompressed_buf_size
<< ", var_size=" << var_size
<< ", time_buf_size=" << time_buf_size << std::endl;
#endif
if (uncompressed_buf_size <= var_size + time_buf_size) {
ret = E_TSFILE_CORRUPTED;
ASSERT(false);
}
}
}
// Step 4: decode time-value buffer into @ret_tsblock
if (IS_SUCC(ret)) {
time_decoder_->reset();
value_decoder_->reset();
time_in_.wrap_from(time_buf, time_buf_size);
value_in_.wrap_from(value_buf, value_buf_size);
// ret = decode_tv_buf_into_tsblock(time_buf, value_buf, time_buf_size,
// value_buf_size, ret_tsblock,
// filter);
ret = decode_tv_buf_into_tsblock_by_datatype(time_in_, value_in_,
ret_tsblock, filter, &pa);
// if we return during @decode_tv_buf_into_tsblock, we should keep
// @uncompressed_buf_ valid until all TV pairs are decoded.
if (ret != E_OVERFLOW) {
if (uncompressed_buf_ != nullptr) {
compressor_->after_uncompress(uncompressed_buf_);
uncompressed_buf_ = nullptr;
}
time_in_.reset();
value_in_.reset();
} else {
ret = E_OK;
}
}
return ret;
}