std::unique_ptr OrcFormatReader::ReadStripe()

in contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc [832:1135]


std::unique_ptr<PaxColumns> OrcFormatReader::ReadStripe(
    size_t group_index, const std::vector<bool> &proj_cols) {
  auto stripe_info = file_footer_.stripes(static_cast<int>(group_index));
  auto pax_columns = std::make_unique<PaxColumns>();
  std::shared_ptr<DataBuffer<char>> data_buffer;
  pax::porc::proto::StripeFooter stripe_footer;
  size_t streams_index = 0;
  size_t streams_size = 0;

  pax_columns->AddRows(stripe_info.numberofrows());

  if (unlikely(stripe_info.footerlength() == 0)) {
    return pax_columns;
  }

  if (reused_buffer_) {
    Assert(reused_buffer_->Capacity() >= 4);
    Assert(reused_buffer_->Used() == 0);
    if (reused_buffer_->Available() < stripe_info.footerlength()) {
      reused_buffer_->ReSize(
          reused_buffer_->Used() + stripe_info.footerlength(), 1.5);
    }
    data_buffer = std::make_shared<DataBuffer<char>>(
        reused_buffer_->GetBuffer(), reused_buffer_->Capacity(), false, false);

  } else {
    data_buffer =
        std::make_shared<DataBuffer<char>>(stripe_info.footerlength());
  }
  pax_columns->Set(data_buffer);
  pax_columns->SetStorageFormat(is_vec_
                                    ? PaxStorageFormat::kTypeStoragePorcVec
                                    : PaxStorageFormat::kTypeStoragePorcNonVec);

  /* `ReadStripeWithProjection` will read the column memory which filter by
   * `proj_cols`, and initialize `stripe_footer`
   *
   * Notice that: should catch `kExTypeIOError` then delete pax columns
   * But for now we will destroy memory context if exception happen.
   * And we don't have a decision that should we use `try...catch` at yet,
   * so it's ok that we just no catch here.
   */
  stripe_footer = ReadStripeWithProjection(data_buffer, stripe_info, proj_cols,
                                           group_index);

  streams_size = stripe_footer.streams_size();

  if (unlikely(streams_size == 0 && column_types_.empty())) {
    return pax_columns;
  }

  data_buffer->BrushBackAll();

  AssertImply(!proj_cols.empty(), column_types_.size() <= proj_cols.size());
  Assert(static_cast<size_t>(stripe_footer.pax_col_encodings_size()) <=
         column_types_.size());
  Assert(column_types_.size() == column_attrs_.size());

  for (size_t index = 0; index < column_types_.size(); index++) {
    /* Skip read current column, just move `streams_index` after
     * `Stream_Kind_DATA` but still need append nullptr into `PaxColumns` to
     * make sure sizeof pax_columns eq with column number
     */
    if (!proj_cols.empty() && !proj_cols[index]) {
      const pax::porc::proto::Stream *n_stream = nullptr;
      do {
        n_stream = &stripe_footer.streams(streams_index++);
      } while (n_stream->kind() !=
               ::pax::porc::proto::Stream_Kind::Stream_Kind_DATA);

      pax_columns->Append(nullptr);
      continue;
    }

    std::shared_ptr<Bitmap8> non_null_bitmap;
    bool has_null = stripe_info.colstats(index).hasnull();
    if (has_null) {
      const pax::porc::proto::Stream &non_null_stream =
          stripe_footer.streams(streams_index++);
      auto bm_nbytes = static_cast<uint32>(non_null_stream.length());
      auto bm_bytes =
          reinterpret_cast<uint8 *>(data_buffer->GetAvailableBuffer());

      Assert(non_null_stream.kind() == pax::porc::proto::Stream_Kind_PRESENT);
      non_null_bitmap =
          std::make_shared<Bitmap8>(BitmapRaw<uint8>(bm_bytes, bm_nbytes),
                                    BitmapTpl<uint8>::ReadOnlyRefBitmap);
      data_buffer->Brush(bm_nbytes);
    }

    std::shared_ptr<DataBuffer<int32>> toast_indexes;
    bool has_toast = stripe_info.colstats(index).hastoast();
    if (has_toast) {
      const pax::porc::proto::Stream &toast_stream =
          stripe_footer.streams(streams_index++);
      auto toast_nbytes = static_cast<uint32>(toast_stream.length());
      auto toast_n = static_cast<uint32>(toast_stream.column());

      Assert(toast_nbytes >= toast_n * sizeof(int32));
      Assert(toast_stream.kind() == pax::porc::proto::Stream_Kind_TOAST);
      toast_indexes = std::make_shared<DataBuffer<int32>>(
          reinterpret_cast<int32 *>(data_buffer->GetAvailableBuffer()),
          toast_n * sizeof(int32), false, false);
      toast_indexes->BrushAll();
      data_buffer->Brush(toast_nbytes);
    }

    switch (column_types_[index]) {
      case (pax::porc::proto::Type_Kind::Type_Kind_BPCHAR):
      case (pax::porc::proto::Type_Kind::Type_Kind_VECBPCHAR):
      case (pax::porc::proto::Type_Kind::Type_Kind_VECNOHEADER):
      case (pax::porc::proto::Type_Kind::Type_Kind_STRING): {
        const pax::porc::proto::Stream &len_stream =
            stripe_footer.streams(streams_index++);
        const pax::porc::proto::Stream &data_stream =
            stripe_footer.streams(streams_index++);
        const ColumnEncoding &data_encoding =
            stripe_footer.pax_col_encodings(index);

        Assert(len_stream.kind() == pax::porc::proto::Stream_Kind_OFFSET);
        Assert(data_stream.kind() == pax::porc::proto::Stream_Kind_DATA);

        pax_columns->Append(
            is_vec_
                ? BuildEncodingVecNonFixedColumn(
                      data_buffer, data_stream, len_stream, data_encoding,
                      column_types_[index] ==
                          pax::porc::proto::Type_Kind::Type_Kind_VECBPCHAR,
                      column_types_[index] ==
                          pax::porc::proto::Type_Kind::Type_Kind_VECNOHEADER)
                : BuildEncodingNonFixedColumn(
                      data_buffer, data_stream, len_stream, data_encoding,
                      column_types_[index] ==
                          pax::porc::proto::Type_Kind::Type_Kind_BPCHAR));
        break;
      }
      case (pax::porc::proto::Type_Kind::Type_Kind_BOOLEAN): {
        pax_columns->Append(BuildEncodingBitPackedColumn(
            data_buffer, stripe_footer.streams(streams_index++),
            stripe_footer.pax_col_encodings(index), is_vec_));
        break;
      }
      case (pax::porc::proto::Type_Kind::Type_Kind_BYTE):
        pax_columns->Append(BuildEncodingColumn<int8>(
            data_buffer, stripe_footer.streams(streams_index++),
            stripe_footer.pax_col_encodings(index), is_vec_));
        break;
      case (pax::porc::proto::Type_Kind::Type_Kind_SHORT):
        pax_columns->Append(BuildEncodingColumn<int16>(
            data_buffer, stripe_footer.streams(streams_index++),
            stripe_footer.pax_col_encodings(index), is_vec_));
        break;
      case (pax::porc::proto::Type_Kind::Type_Kind_INT): {
        pax_columns->Append(BuildEncodingColumn<int32>(
            data_buffer, stripe_footer.streams(streams_index++),
            stripe_footer.pax_col_encodings(index), is_vec_));
        break;
      }
      case (pax::porc::proto::Type_Kind::Type_Kind_LONG): {
        pax_columns->Append(BuildEncodingColumn<int64>(
            data_buffer, stripe_footer.streams(streams_index++),
            stripe_footer.pax_col_encodings(index), is_vec_));
        break;
      }
      case (pax::porc::proto::Type_Kind::Type_Kind_DECIMAL): {
        const pax::porc::proto::Stream &len_stream =
            stripe_footer.streams(streams_index++);
        const pax::porc::proto::Stream &data_stream =
            stripe_footer.streams(streams_index++);
        const ColumnEncoding &data_encoding =
            stripe_footer.pax_col_encodings(index);

        Assert(len_stream.kind() == pax::porc::proto::Stream_Kind_OFFSET);
        Assert(data_stream.kind() == pax::porc::proto::Stream_Kind_DATA);

        pax_columns->Append(BuildEncodingDecimalColumn(
            data_buffer, data_stream, len_stream, data_encoding));
        break;
      }
      case (pax::porc::proto::Type_Kind::Type_Kind_VECDECIMAL): {
        pax_columns->Append(BuildVecEncodingDecimalColumn(
            data_buffer, stripe_footer.streams(streams_index++),
            stripe_footer.pax_col_encodings(index), is_vec_));
        break;
      }
      default:
        // should't be here
        Assert(!"should't be here, non-implemented type");
        break;
    }

    // fill nulls data buffer
    Assert(pax_columns->GetColumns() > 0);
    const auto &last_column = (*pax_columns)[pax_columns->GetColumns() - 1];
    Assert(last_column);
    last_column->SetRows(stripe_info.numberofrows());
    if (has_null) {
      Assert(non_null_bitmap);
      last_column->SetBitmap(non_null_bitmap);
    }

    if (!column_attrs_[index].empty()) {
      last_column->SetAttributes(column_attrs_[index]);
    }

    if (has_toast) {
      Assert(toast_indexes);
      last_column->SetToastIndexes(toast_indexes);
    }
  }

#ifdef ENABLE_DEBUG
  auto storage_tyep_verify = is_vec_ ? PaxStorageFormat::kTypeStoragePorcVec
                                     : PaxStorageFormat::kTypeStoragePorcNonVec;

  Assert(storage_tyep_verify == pax_columns->GetStorageFormat());
  for (size_t index = 0; index < column_types_.size(); index++) {
    const auto &column_verify = (*pax_columns)[index];
    if (column_verify) {
      Assert(storage_tyep_verify == column_verify->GetStorageFormat());
    }
  }
#endif

  AssertImply(!toast_file_, stripe_info.toastlength() == 0);
  // deal the toast part
  if (toast_file_ && stripe_info.toastlength() > 0) {
    std::vector<std::pair<off64_t, size_t>> projection_no_combine, projection;
    std::vector<size_t> column_ext_sizes;
    size_t toast_file_size;
    uint64 ext_total_size = 0;
    off64_t curr_column_ext_offset = 0;
    std::unique_ptr<DataBuffer<char>> external_toast_buffer;

    Assert(stripe_info.numberoftoast() != 0);
    Assert((size_t)stripe_info.exttoastlength_size() ==
           pax_columns->GetColumns());
    toast_file_size = toast_file_->FileLength();
    Assert(ext_total_size <= toast_file_size);

    for (int i = 0; i < stripe_info.exttoastlength_size(); i++) {
      auto curr_column_ext_len = stripe_info.exttoastlength(i);
      column_ext_sizes.emplace_back(curr_column_ext_len);
      if ((*pax_columns)[i] && curr_column_ext_len) {
        ext_total_size += curr_column_ext_len;
        // must be sorted
        projection_no_combine.emplace_back(
            curr_column_ext_offset,
            curr_column_ext_offset + curr_column_ext_len);
      }

      curr_column_ext_offset += curr_column_ext_len;
    }

#ifdef ENABLE_DEBUG
    // verify the external toast datums
    pax_columns->VerifyAllExternalToasts(column_ext_sizes);
#endif

    for (size_t i = 0; i < projection_no_combine.size();) {
      uint64 t = projection_no_combine[i].second;
      uint64 j = i + 1;
      while (j < projection_no_combine.size() &&
             (size_t)projection_no_combine[j].first == t) {
        t = projection_no_combine[j].second;
        j++;
      }
      projection.emplace_back(
          std::make_pair(projection_no_combine[i].first, t));
      i = j;
    }

    external_toast_buffer = std::make_unique<DataBuffer<char>>(ext_total_size);

    for (const auto &range : projection) {
      CBDB_CHECK(
          external_toast_buffer->Available() >= (range.second - range.first) &&
              range.second <= toast_file_size &&
              (range.second - range.first) <= stripe_info.toastlength(),
          cbdb::CException::ExType::kExTypeInvalidExternalToast,
          fmt("Failed to parse current external toast, [file size=%lu, total "
              "size=%lu, stripe toast len=%lu, remain len=%lu, toast "
              "offset=%ld, toast end pos=%lu], \n pax file: %s, \n toast file: "
              "%s",
              toast_file_size, ext_total_size, stripe_info.toastlength(),
              external_toast_buffer->Available(), range.second, range.first,
              file_->DebugString().c_str(),
              toast_file_->DebugString().c_str()));

      toast_file_->PReadN(external_toast_buffer->GetAvailableBuffer(),
                          range.second - range.first,
                          stripe_info.toastoffset() + range.first);
      external_toast_buffer->Brush(range.second - range.first);
    }

    Assert(external_toast_buffer->Available() == 0);

    pax_columns->SetExternalToastDataBuffer(std::move(external_toast_buffer),
                                            column_ext_sizes);
  }

  Assert(streams_size == streams_index);
  return pax_columns;
}