cpp-ch/local-engine/Storages/Parquet/ParquetMeta.cpp (194 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "ParquetMeta.h" #include <Formats/FormatFactory.h> #include <Formats/FormatSettings.h> #include <Processors/Formats/Impl/ArrowBufferedStreams.h> #include <Processors/Formats/Impl/ArrowColumnToCHColumn.h> #include <Processors/Formats/Impl/ArrowFieldIndexUtil.h> #include <Storages/Parquet/ArrowUtils.h> #include <parquet/arrow/reader.h> #include <parquet/arrow/schema.h> #include <parquet/metadata.h> namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; } } using namespace DB; namespace local_engine { std::unique_ptr<parquet::ParquetFileReader> ParquetMetaBuilder::openInputParquetFile(ReadBuffer & read_buffer) { const FormatSettings format_settings{ .seekable_read = true, }; std::atomic<int> is_stopped{0}; auto arrow_file = asArrowFile(read_buffer, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES); return parquet::ParquetFileReader::Open(arrow_file, parquet::default_reader_properties(), nullptr); } Block ParquetMetaBuilder::collectFileSchema(const ContextPtr & context, ReadBuffer & read_buffer) { assert(dynamic_cast<SeekableReadBuffer *>(&read_buffer) != nullptr); FormatSettings format_settings = getFormatSettings(context); ParquetMetaBuilder metaBuilder{ .case_insensitive = format_settings.parquet.case_insensitive_column_matching, .allow_missing_columns = false, .collectPageIndex = false, .collectSchema = true}; metaBuilder.build(read_buffer); return metaBuilder.fileHeader; } std::vector<Int32> ParquetMetaBuilder::pruneColumn( const Block & header, const parquet::FileMetaData & metadata, bool case_insensitive, bool allow_missing_columns) { std::shared_ptr<arrow::Schema> schema; THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata.schema(), &schema)); ArrowFieldIndexUtil field_util(case_insensitive, allow_missing_columns); auto index_mapping = field_util.findRequiredIndices(header, *schema, metadata); std::vector<Int32> column_indices; for (const auto & [clickhouse_header_index, parquet_indexes] : index_mapping) for (auto parquet_index : parquet_indexes) column_indices.push_back(parquet_index); return column_indices; } std::unique_ptr<ColumnIndexStore> ParquetMetaBuilder::collectColumnIndex( const parquet::RowGroupMetaData & rgMeta, parquet::RowGroupPageIndexReader & rowGroupPageIndex, const std::vector<Int32> & column_indices, bool case_insensitive) { auto result = std::make_unique<ColumnIndexStore>(); ColumnIndexStore & column_index_store = *result; column_index_store.reserve(column_indices.size()); for (auto const column_index : column_indices) { const auto * col_desc = rgMeta.schema()->Column(column_index); const auto col_index = rowGroupPageIndex.GetColumnIndex(column_index); const auto offset_index = rowGroupPageIndex.GetOffsetIndex(column_index); const std::string columnName = case_insensitive ? boost::to_lower_copy(col_desc->name()) : col_desc->name(); column_index_store[columnName] = ColumnIndex::create(col_desc, col_index, offset_index); } return result; } ParquetMetaBuilder & ParquetMetaBuilder::buildSchema(const parquet::FileMetaData & file_meta) { if (collectSchema) { std::shared_ptr<arrow::Schema> schema; THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(file_meta.schema(), &schema)); fileHeader = ArrowColumnToCHColumn::arrowSchemaToCHHeader(*schema, file_meta.key_value_metadata(), "Parquet", false, true); } return *this; } ParquetMetaBuilder & ParquetMetaBuilder::buildRequiredRowGroups( const parquet::FileMetaData & file_meta, const std::function<bool(UInt64)> & should_include_row_group) { Int32 total_row_groups = file_meta.num_row_groups(); readRowGroups.reserve(total_row_groups); auto get_column_start_offset = [&](const parquet::ColumnChunkMetaData & metadata_) -> Int64 { Int64 offset = metadata_.data_page_offset(); if (metadata_.has_dictionary_page() && offset > metadata_.dictionary_page_offset()) offset = metadata_.dictionary_page_offset(); return offset; }; UInt64 rowStartIndexOffset = 0; for (int i = 0; i < total_row_groups; ++i) { const auto row_group_meta = file_meta.RowGroup(i); Int64 start_offset = get_column_start_offset(*row_group_meta->ColumnChunk(0)); Int64 total_bytes = row_group_meta->total_compressed_size(); if (!total_bytes) for (int j = 0; j < row_group_meta->num_columns(); ++j) total_bytes += row_group_meta->ColumnChunk(j)->total_compressed_size(); const UInt64 midpoint_offset = static_cast<UInt64>(start_offset + total_bytes / 2); if (should_include_row_group(midpoint_offset)) { RowGroupInformation info; info.index = i; info.num_rows = row_group_meta->num_rows(); info.start = row_group_meta->file_offset(); info.total_compressed_size = row_group_meta->total_compressed_size(); info.total_size = row_group_meta->total_byte_size(); info.rowStartIndexOffset = rowStartIndexOffset; readRowGroups.emplace_back(std::move(info)); } rowStartIndexOffset += row_group_meta->num_rows(); } return *this; } ParquetMetaBuilder & ParquetMetaBuilder::buildSkipRowGroup(const parquet::FileMetaData & file_meta) { if (collectSkipRowGroup) { Int32 total_row_groups = file_meta.num_row_groups(); std::vector<Int32> total_row_group_indices(total_row_groups); std::iota(total_row_group_indices.begin(), total_row_group_indices.end(), 0); std::vector<Int32> required_row_group_indices(readRowGroups.size()); for (size_t i = 0; i < readRowGroups.size(); ++i) required_row_group_indices[i] = readRowGroups[i].index; std::ranges::set_difference(total_row_group_indices, required_row_group_indices, std::back_inserter(skipRowGroups)); } return *this; } ParquetMetaBuilder & ParquetMetaBuilder::buildAllRowRange(const parquet::FileMetaData & file_meta) { if (collectPageIndex) { assert(collectSchema && fileHeader.columns() > 0 && "collectSchema must be true when collectPageIndex is true"); readColumns = pruneColumn(fileHeader, file_meta, case_insensitive, allow_missing_columns); for (auto & row_group : readRowGroups) row_group.rowRanges = RowRanges::createSingle(row_group.num_rows); } return *this; } ParquetMetaBuilder & ParquetMetaBuilder::buildRowRange( parquet::ParquetFileReader & reader, const parquet::FileMetaData & file_meta, const Block & readBlock, const ColumnIndexFilter * column_index_filter) { if (collectPageIndex) { readColumns = pruneColumn(readBlock, file_meta, case_insensitive, allow_missing_columns); for (auto & row_group : readRowGroups) { const auto rgMeta = file_meta.RowGroup(row_group.index); const auto pageIndex = reader.GetPageIndexReader(); const auto rowGroupPageIndex = pageIndex == nullptr ? nullptr : pageIndex->RowGroup(row_group.index); if (column_index_filter == nullptr || rowGroupPageIndex == nullptr) row_group.rowRanges = RowRanges::createSingle(row_group.num_rows); else { auto columnIndex = collectColumnIndex(*rgMeta, *rowGroupPageIndex, readColumns, case_insensitive); row_group.rowRanges = column_index_filter->calculateRowRanges(*columnIndex, row_group.num_rows); row_group.columnIndexStore = std::move(columnIndex); } } } return *this; } ParquetMetaBuilder & ParquetMetaBuilder::build( ReadBuffer & read_buffer, const Block & readBlock, const ColumnIndexFilter * column_index_filter, const std::function<bool(UInt64)> & should_include_row_group) { auto reader = openInputParquetFile(read_buffer); fileMetaData = reader->metadata(); return buildRequiredRowGroups(*fileMetaData, should_include_row_group) .buildSkipRowGroup(*fileMetaData) .buildSchema(*fileMetaData) .buildRowRange(*reader, *fileMetaData, readBlock, column_index_filter); } ParquetMetaBuilder & ParquetMetaBuilder::build(ReadBuffer & read_buffer, const std::function<bool(UInt64)> & should_include_row_group) { auto reader = openInputParquetFile(read_buffer); fileMetaData = reader->metadata(); return buildRequiredRowGroups(*fileMetaData, should_include_row_group) .buildSkipRowGroup(*fileMetaData) .buildSchema(*fileMetaData) .buildAllRowRange(*fileMetaData); } }