in contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc [832:1135]
std::unique_ptr<PaxColumns> OrcFormatReader::ReadStripe(
size_t group_index, const std::vector<bool> &proj_cols) {
auto stripe_info = file_footer_.stripes(static_cast<int>(group_index));
auto pax_columns = std::make_unique<PaxColumns>();
std::shared_ptr<DataBuffer<char>> data_buffer;
pax::porc::proto::StripeFooter stripe_footer;
size_t streams_index = 0;
size_t streams_size = 0;
pax_columns->AddRows(stripe_info.numberofrows());
if (unlikely(stripe_info.footerlength() == 0)) {
return pax_columns;
}
if (reused_buffer_) {
Assert(reused_buffer_->Capacity() >= 4);
Assert(reused_buffer_->Used() == 0);
if (reused_buffer_->Available() < stripe_info.footerlength()) {
reused_buffer_->ReSize(
reused_buffer_->Used() + stripe_info.footerlength(), 1.5);
}
data_buffer = std::make_shared<DataBuffer<char>>(
reused_buffer_->GetBuffer(), reused_buffer_->Capacity(), false, false);
} else {
data_buffer =
std::make_shared<DataBuffer<char>>(stripe_info.footerlength());
}
pax_columns->Set(data_buffer);
pax_columns->SetStorageFormat(is_vec_
? PaxStorageFormat::kTypeStoragePorcVec
: PaxStorageFormat::kTypeStoragePorcNonVec);
/* `ReadStripeWithProjection` will read the column memory which filter by
* `proj_cols`, and initialize `stripe_footer`
*
* Notice that: should catch `kExTypeIOError` then delete pax columns
* But for now we will destroy memory context if exception happen.
* And we don't have a decision that should we use `try...catch` at yet,
* so it's ok that we just no catch here.
*/
stripe_footer = ReadStripeWithProjection(data_buffer, stripe_info, proj_cols,
group_index);
streams_size = stripe_footer.streams_size();
if (unlikely(streams_size == 0 && column_types_.empty())) {
return pax_columns;
}
data_buffer->BrushBackAll();
AssertImply(!proj_cols.empty(), column_types_.size() <= proj_cols.size());
Assert(static_cast<size_t>(stripe_footer.pax_col_encodings_size()) <=
column_types_.size());
Assert(column_types_.size() == column_attrs_.size());
for (size_t index = 0; index < column_types_.size(); index++) {
/* Skip read current column, just move `streams_index` after
* `Stream_Kind_DATA` but still need append nullptr into `PaxColumns` to
* make sure sizeof pax_columns eq with column number
*/
if (!proj_cols.empty() && !proj_cols[index]) {
const pax::porc::proto::Stream *n_stream = nullptr;
do {
n_stream = &stripe_footer.streams(streams_index++);
} while (n_stream->kind() !=
::pax::porc::proto::Stream_Kind::Stream_Kind_DATA);
pax_columns->Append(nullptr);
continue;
}
std::shared_ptr<Bitmap8> non_null_bitmap;
bool has_null = stripe_info.colstats(index).hasnull();
if (has_null) {
const pax::porc::proto::Stream &non_null_stream =
stripe_footer.streams(streams_index++);
auto bm_nbytes = static_cast<uint32>(non_null_stream.length());
auto bm_bytes =
reinterpret_cast<uint8 *>(data_buffer->GetAvailableBuffer());
Assert(non_null_stream.kind() == pax::porc::proto::Stream_Kind_PRESENT);
non_null_bitmap =
std::make_shared<Bitmap8>(BitmapRaw<uint8>(bm_bytes, bm_nbytes),
BitmapTpl<uint8>::ReadOnlyRefBitmap);
data_buffer->Brush(bm_nbytes);
}
std::shared_ptr<DataBuffer<int32>> toast_indexes;
bool has_toast = stripe_info.colstats(index).hastoast();
if (has_toast) {
const pax::porc::proto::Stream &toast_stream =
stripe_footer.streams(streams_index++);
auto toast_nbytes = static_cast<uint32>(toast_stream.length());
auto toast_n = static_cast<uint32>(toast_stream.column());
Assert(toast_nbytes >= toast_n * sizeof(int32));
Assert(toast_stream.kind() == pax::porc::proto::Stream_Kind_TOAST);
toast_indexes = std::make_shared<DataBuffer<int32>>(
reinterpret_cast<int32 *>(data_buffer->GetAvailableBuffer()),
toast_n * sizeof(int32), false, false);
toast_indexes->BrushAll();
data_buffer->Brush(toast_nbytes);
}
switch (column_types_[index]) {
case (pax::porc::proto::Type_Kind::Type_Kind_BPCHAR):
case (pax::porc::proto::Type_Kind::Type_Kind_VECBPCHAR):
case (pax::porc::proto::Type_Kind::Type_Kind_VECNOHEADER):
case (pax::porc::proto::Type_Kind::Type_Kind_STRING): {
const pax::porc::proto::Stream &len_stream =
stripe_footer.streams(streams_index++);
const pax::porc::proto::Stream &data_stream =
stripe_footer.streams(streams_index++);
const ColumnEncoding &data_encoding =
stripe_footer.pax_col_encodings(index);
Assert(len_stream.kind() == pax::porc::proto::Stream_Kind_OFFSET);
Assert(data_stream.kind() == pax::porc::proto::Stream_Kind_DATA);
pax_columns->Append(
is_vec_
? BuildEncodingVecNonFixedColumn(
data_buffer, data_stream, len_stream, data_encoding,
column_types_[index] ==
pax::porc::proto::Type_Kind::Type_Kind_VECBPCHAR,
column_types_[index] ==
pax::porc::proto::Type_Kind::Type_Kind_VECNOHEADER)
: BuildEncodingNonFixedColumn(
data_buffer, data_stream, len_stream, data_encoding,
column_types_[index] ==
pax::porc::proto::Type_Kind::Type_Kind_BPCHAR));
break;
}
case (pax::porc::proto::Type_Kind::Type_Kind_BOOLEAN): {
pax_columns->Append(BuildEncodingBitPackedColumn(
data_buffer, stripe_footer.streams(streams_index++),
stripe_footer.pax_col_encodings(index), is_vec_));
break;
}
case (pax::porc::proto::Type_Kind::Type_Kind_BYTE):
pax_columns->Append(BuildEncodingColumn<int8>(
data_buffer, stripe_footer.streams(streams_index++),
stripe_footer.pax_col_encodings(index), is_vec_));
break;
case (pax::porc::proto::Type_Kind::Type_Kind_SHORT):
pax_columns->Append(BuildEncodingColumn<int16>(
data_buffer, stripe_footer.streams(streams_index++),
stripe_footer.pax_col_encodings(index), is_vec_));
break;
case (pax::porc::proto::Type_Kind::Type_Kind_INT): {
pax_columns->Append(BuildEncodingColumn<int32>(
data_buffer, stripe_footer.streams(streams_index++),
stripe_footer.pax_col_encodings(index), is_vec_));
break;
}
case (pax::porc::proto::Type_Kind::Type_Kind_LONG): {
pax_columns->Append(BuildEncodingColumn<int64>(
data_buffer, stripe_footer.streams(streams_index++),
stripe_footer.pax_col_encodings(index), is_vec_));
break;
}
case (pax::porc::proto::Type_Kind::Type_Kind_DECIMAL): {
const pax::porc::proto::Stream &len_stream =
stripe_footer.streams(streams_index++);
const pax::porc::proto::Stream &data_stream =
stripe_footer.streams(streams_index++);
const ColumnEncoding &data_encoding =
stripe_footer.pax_col_encodings(index);
Assert(len_stream.kind() == pax::porc::proto::Stream_Kind_OFFSET);
Assert(data_stream.kind() == pax::porc::proto::Stream_Kind_DATA);
pax_columns->Append(BuildEncodingDecimalColumn(
data_buffer, data_stream, len_stream, data_encoding));
break;
}
case (pax::porc::proto::Type_Kind::Type_Kind_VECDECIMAL): {
pax_columns->Append(BuildVecEncodingDecimalColumn(
data_buffer, stripe_footer.streams(streams_index++),
stripe_footer.pax_col_encodings(index), is_vec_));
break;
}
default:
// should't be here
Assert(!"should't be here, non-implemented type");
break;
}
// fill nulls data buffer
Assert(pax_columns->GetColumns() > 0);
const auto &last_column = (*pax_columns)[pax_columns->GetColumns() - 1];
Assert(last_column);
last_column->SetRows(stripe_info.numberofrows());
if (has_null) {
Assert(non_null_bitmap);
last_column->SetBitmap(non_null_bitmap);
}
if (!column_attrs_[index].empty()) {
last_column->SetAttributes(column_attrs_[index]);
}
if (has_toast) {
Assert(toast_indexes);
last_column->SetToastIndexes(toast_indexes);
}
}
#ifdef ENABLE_DEBUG
auto storage_tyep_verify = is_vec_ ? PaxStorageFormat::kTypeStoragePorcVec
: PaxStorageFormat::kTypeStoragePorcNonVec;
Assert(storage_tyep_verify == pax_columns->GetStorageFormat());
for (size_t index = 0; index < column_types_.size(); index++) {
const auto &column_verify = (*pax_columns)[index];
if (column_verify) {
Assert(storage_tyep_verify == column_verify->GetStorageFormat());
}
}
#endif
AssertImply(!toast_file_, stripe_info.toastlength() == 0);
// deal the toast part
if (toast_file_ && stripe_info.toastlength() > 0) {
std::vector<std::pair<off64_t, size_t>> projection_no_combine, projection;
std::vector<size_t> column_ext_sizes;
size_t toast_file_size;
uint64 ext_total_size = 0;
off64_t curr_column_ext_offset = 0;
std::unique_ptr<DataBuffer<char>> external_toast_buffer;
Assert(stripe_info.numberoftoast() != 0);
Assert((size_t)stripe_info.exttoastlength_size() ==
pax_columns->GetColumns());
toast_file_size = toast_file_->FileLength();
Assert(ext_total_size <= toast_file_size);
for (int i = 0; i < stripe_info.exttoastlength_size(); i++) {
auto curr_column_ext_len = stripe_info.exttoastlength(i);
column_ext_sizes.emplace_back(curr_column_ext_len);
if ((*pax_columns)[i] && curr_column_ext_len) {
ext_total_size += curr_column_ext_len;
// must be sorted
projection_no_combine.emplace_back(
curr_column_ext_offset,
curr_column_ext_offset + curr_column_ext_len);
}
curr_column_ext_offset += curr_column_ext_len;
}
#ifdef ENABLE_DEBUG
// verify the external toast datums
pax_columns->VerifyAllExternalToasts(column_ext_sizes);
#endif
for (size_t i = 0; i < projection_no_combine.size();) {
uint64 t = projection_no_combine[i].second;
uint64 j = i + 1;
while (j < projection_no_combine.size() &&
(size_t)projection_no_combine[j].first == t) {
t = projection_no_combine[j].second;
j++;
}
projection.emplace_back(
std::make_pair(projection_no_combine[i].first, t));
i = j;
}
external_toast_buffer = std::make_unique<DataBuffer<char>>(ext_total_size);
for (const auto &range : projection) {
CBDB_CHECK(
external_toast_buffer->Available() >= (range.second - range.first) &&
range.second <= toast_file_size &&
(range.second - range.first) <= stripe_info.toastlength(),
cbdb::CException::ExType::kExTypeInvalidExternalToast,
fmt("Failed to parse current external toast, [file size=%lu, total "
"size=%lu, stripe toast len=%lu, remain len=%lu, toast "
"offset=%ld, toast end pos=%lu], \n pax file: %s, \n toast file: "
"%s",
toast_file_size, ext_total_size, stripe_info.toastlength(),
external_toast_buffer->Available(), range.second, range.first,
file_->DebugString().c_str(),
toast_file_->DebugString().c_str()));
toast_file_->PReadN(external_toast_buffer->GetAvailableBuffer(),
range.second - range.first,
stripe_info.toastoffset() + range.first);
external_toast_buffer->Brush(range.second - range.first);
}
Assert(external_toast_buffer->Available() == 0);
pax_columns->SetExternalToastDataBuffer(std::move(external_toast_buffer),
column_ext_sizes);
}
Assert(streams_size == streams_index);
return pax_columns;
}