std::shared_ptr SerializedPageReader::NextPage()

in cpp-ch/local-engine/Storages/ch_parquet/arrow/column_reader.cc [438:576]


std::shared_ptr<Page> SerializedPageReader::NextPage() {
  ThriftDeserializer deserializer(properties_);

  // Loop here because there may be unhandled page types that we skip until
  // finding a page that we do know what to do with
  while (seen_num_values_ < total_num_values_) {
    uint32_t header_size = 0;
    uint32_t allowed_page_size = kDefaultPageHeaderSize;

    // Page headers can be very large because of page statistics
    // We try to deserialize a larger buffer progressively
    // until a maximum allowed header limit
    while (true) {
      PARQUET_ASSIGN_OR_THROW(auto view, stream_->Peek(allowed_page_size));
      if (view.size() == 0) {
        return std::shared_ptr<Page>(nullptr);
      }

      // This gets used, then set by DeserializeThriftMsg
      header_size = static_cast<uint32_t>(view.size());
      try {
        if (crypto_ctx_.meta_decryptor != nullptr) {
          UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader,
                           &data_page_header_aad_);
        }
        deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()),
                                        &header_size, &current_page_header_,
                                        crypto_ctx_.meta_decryptor);
        break;
      } catch (std::exception& e) {
        // Failed to deserialize. Double the allowed page header size and try again
        std::stringstream ss;
        ss << e.what();
        allowed_page_size *= 2;
        if (allowed_page_size > max_page_header_size_) {
          ss << "Deserializing page header failed.\n";
          throw ParquetException(ss.str());
        }
      }
    }
    // Advance the stream offset
    PARQUET_THROW_NOT_OK(stream_->Advance(header_size));

    int compressed_len = current_page_header_.compressed_page_size;
    int uncompressed_len = current_page_header_.uncompressed_page_size;
    if (compressed_len < 0 || uncompressed_len < 0) {
      throw ParquetException("Invalid page header");
    }

    EncodedStatistics data_page_statistics;
    if (ShouldSkipPage(&data_page_statistics)) {
      PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
      continue;
    }

    if (crypto_ctx_.data_decryptor != nullptr) {
      UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
                       &data_page_aad_);
    }

    // Read the compressed data page.
    PARQUET_ASSIGN_OR_THROW(auto page_buffer, stream_->Read(compressed_len));
    if (page_buffer->size() != compressed_len) {
      std::stringstream ss;
      ss << "Page was smaller (" << page_buffer->size() << ") than expected ("
         << compressed_len << ")";
      ParquetException::EofException(ss.str());
    }

    // Decrypt it if we need to
    if (crypto_ctx_.data_decryptor != nullptr) {
      PARQUET_THROW_NOT_OK(decryption_buffer_->Resize(
          compressed_len - crypto_ctx_.data_decryptor->CiphertextSizeDelta(),
          /*shrink_to_fit=*/false));
      compressed_len = crypto_ctx_.data_decryptor->Decrypt(
          page_buffer->data(), compressed_len, decryption_buffer_->mutable_data());

      page_buffer = decryption_buffer_;
    }

    // Uncompress and construct the pages to return.
    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
    if (page_type == PageType::DICTIONARY_PAGE) {
      crypto_ctx_.start_decrypt_with_dictionary_page = false;
      const format::DictionaryPageHeader& dict_header =
          current_page_header_.dictionary_page_header;
      bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;

      page_buffer =
          DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);

      return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
                                              LoadEnumSafe(&dict_header.encoding),
                                              is_sorted);
    } else if (page_type == PageType::DATA_PAGE) {
      ++page_ordinal_;
      const format::DataPageHeader& header = current_page_header_.data_page_header;
      page_buffer =
          DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);

      return std::make_shared<DataPageV1>(page_buffer, header.num_values,
                                          LoadEnumSafe(&header.encoding),
                                          LoadEnumSafe(&header.definition_level_encoding),
                                          LoadEnumSafe(&header.repetition_level_encoding),
                                          uncompressed_len, data_page_statistics);
    } else if (page_type == PageType::DATA_PAGE_V2) {
      ++page_ordinal_;
      const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;

      // Arrow prior to 3.0.0 set is_compressed to false but still compressed.
      bool is_compressed =
          (header.__isset.is_compressed ? header.is_compressed : false) ||
          always_compressed_;

      // Uncompress if needed
      int levels_byte_len;
      if (AddWithOverflow(header.definition_levels_byte_length,
                          header.repetition_levels_byte_length, &levels_byte_len)) {
        throw ParquetException("Levels size too large (corrupt file?)");
      }
      // DecompressIfNeeded doesn't take `is_compressed` into account as
      // it's page type-agnostic.
      if (is_compressed) {
        page_buffer = DecompressIfNeeded(std::move(page_buffer), compressed_len,
                                         uncompressed_len, levels_byte_len);
      }

      return std::make_shared<DataPageV2>(
          page_buffer, header.num_values, header.num_nulls, header.num_rows,
          LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
          header.repetition_levels_byte_length, uncompressed_len, is_compressed,
          data_page_statistics);
    } else {
      throw ParquetException(
          "Internal error, we have already skipped non-data pages in ShouldSkipPage()");
    }
  }
  return std::shared_ptr<Page>(nullptr);
}