in cpp/src/reader/aligned_chunk_reader.cc [105:182]
int AlignedChunkReader::load_by_aligned_meta(ChunkMeta *time_chunk_meta,
ChunkMeta *value_chunk_meta) {
int ret = E_OK;
time_chunk_meta_ = time_chunk_meta;
value_chunk_meta_ = value_chunk_meta;
#if DEBUG_SE
std::cout << "AlignedChunkReader::load_by_meta, meta=" << *time_chunk_meta
<< ", " << *value_chunk_meta << std::endl;
#endif
/* ================ deserialize time_chunk_header ================*/
// TODO configurable
file_data_time_buf_size_ = 1024;
file_data_value_buf_size_ = 1024;
int32_t ret_read_len = 0;
char *time_file_data_buf =
(char *)mem_alloc(file_data_time_buf_size_, MOD_CHUNK_READER);
if (IS_NULL(time_file_data_buf)) {
return E_OOM;
}
ret = read_file_->read(time_chunk_meta_->offset_of_chunk_header_,
time_file_data_buf, file_data_time_buf_size_,
ret_read_len);
if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
ret = E_TSFILE_CORRUPTED;
LOGE("file corrupted, ret=" << ret << ", offset="
<< time_chunk_meta_->offset_of_chunk_header_
<< "read_len=" << ret_read_len);
mem_free(time_file_data_buf);
}
if (IS_SUCC(ret)) {
time_in_stream_.wrap_from(time_file_data_buf, ret_read_len);
if (RET_FAIL(time_chunk_header_.deserialize_from(time_in_stream_))) {
} else {
time_chunk_visit_offset_ = time_in_stream_.read_pos();
}
}
/* ================ deserialize value_chunk_header ================*/
ret_read_len = 0;
char *value_file_data_buf =
(char *)mem_alloc(file_data_value_buf_size_, MOD_CHUNK_READER);
if (IS_NULL(value_file_data_buf)) {
return E_OOM;
}
ret = read_file_->read(value_chunk_meta_->offset_of_chunk_header_,
value_file_data_buf, file_data_value_buf_size_,
ret_read_len);
if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
ret = E_TSFILE_CORRUPTED;
LOGE("file corrupted, ret="
<< ret << ", offset=" << value_chunk_meta_->offset_of_chunk_header_
<< "read_len=" << ret_read_len);
mem_free(value_file_data_buf);
}
if (IS_SUCC(ret)) {
value_in_stream_.wrap_from(value_file_data_buf, ret_read_len);
if (RET_FAIL(value_chunk_header_.deserialize_from(value_in_stream_))) {
} else if (RET_FAIL(alloc_compressor_and_decoder(
time_decoder_, time_compressor_,
time_chunk_header_.encoding_type_,
time_chunk_header_.data_type_,
time_chunk_header_.compression_type_))) {
} else if (RET_FAIL(alloc_compressor_and_decoder(
value_decoder_, value_compressor_,
value_chunk_header_.encoding_type_,
value_chunk_header_.data_type_,
value_chunk_header_.compression_type_))) {
} else {
value_chunk_visit_offset_ = value_in_stream_.read_pos();
#if DEBUG_SE
std::cout << "AlignedChunkReader::load_by_meta, time_chunk_header="
<< time_chunk_header_
<< ", value_chunk_header=" << value_chunk_header_
<< std::endl;
#endif
}
}
return ret;
}