cpp-ch/local-engine/Storages/ch_parquet/arrow/reader.cc (1,121 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "reader.h"
#include <algorithm>
#include <cstring>
#include <memory>
#include <unordered_set>
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/extension_type.h"
#include "arrow/io/memory.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/parallel.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"
#include "Storages/ch_parquet/arrow/reader_internal.h"
#include "Storages/ch_parquet/arrow/column_reader.h"
#include "parquet/exception.h"
#include "parquet/file_reader.h"
#include "parquet/metadata.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
using arrow::Array;
using arrow::ArrayData;
using arrow::BooleanArray;
using arrow::ChunkedArray;
using arrow::DataType;
using arrow::ExtensionType;
using arrow::Field;
using arrow::Future;
using arrow::Int32Array;
using arrow::ListArray;
using arrow::MemoryPool;
using arrow::RecordBatchReader;
using arrow::ResizableBuffer;
using arrow::Result;
using arrow::Status;
using arrow::StructArray;
using arrow::Table;
using arrow::TimestampArray;
using arrow::internal::checked_cast;
using arrow::internal::Iota;
// Help reduce verbosity
using ParquetReader = ch_parquet::ParquetFileReader;
using ch_parquet::internal::RecordReader;
namespace bit_util = arrow::bit_util;
using parquet::ParquetFileReader;
using parquet::ArrowReaderProperties;
using parquet::PageReader;
using parquet::ColumnDescriptor;
using parquet::Buffer;
using parquet::arrow::SchemaManifest;
namespace ch_parquet {
namespace arrow {
using namespace parquet::arrow;
namespace {
::arrow::Result<std::shared_ptr<ArrayData>> ChunksToSingle(const ChunkedArray& chunked) {
switch (chunked.num_chunks()) {
case 0: {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> array,
::arrow::MakeArrayOfNull(chunked.type(), 0));
return array->data();
}
case 1:
return chunked.chunk(0)->data();
default:
// ARROW-3762(wesm): If item reader yields a chunked array, we reject as
// this is not yet implemented
return Status::NotImplemented(
"Nested data conversions not implemented for chunked array outputs");
}
}
} // namespace
class ColumnReaderImpl : public ColumnReader {
public:
virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0;
virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0;
virtual const std::shared_ptr<Field> field() = 0;
::arrow::Status NextBatch(int64_t batch_size,
std::shared_ptr<::arrow::ChunkedArray>* out) final {
RETURN_NOT_OK(LoadBatch(batch_size));
RETURN_NOT_OK(BuildArray(batch_size, out));
for (int x = 0; x < (*out)->num_chunks(); x++) {
RETURN_NOT_OK((*out)->chunk(x)->Validate());
}
return Status::OK();
}
virtual ::arrow::Status LoadBatch(int64_t num_records) = 0;
virtual ::arrow::Status BuildArray(int64_t length_upper_bound,
std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
virtual bool IsOrHasRepeatedChild() const = 0;
};
namespace {
std::shared_ptr<std::unordered_set<int>> VectorToSharedSet(
const std::vector<int>& values) {
std::shared_ptr<std::unordered_set<int>> result(new std::unordered_set<int>());
result->insert(values.begin(), values.end());
return result;
}
// Forward declaration
Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& context,
std::unique_ptr<ColumnReaderImpl>* out);
// ----------------------------------------------------------------------
// FileReaderImpl forward declaration
class FileReaderImpl : public FileReader {
public:
FileReaderImpl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
ArrowReaderProperties properties)
: pool_(pool),
reader_(std::move(reader)),
reader_properties_(std::move(properties)) {}
Status Init() {
return SchemaManifest::Make(reader_->metadata()->schema(),
reader_->metadata()->key_value_metadata(),
reader_properties_, &manifest_);
}
FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups) {
return [row_groups](int i, ParquetFileReader* reader) {
return new FileColumnIterator(i, reader, row_groups);
};
}
FileColumnIteratorFactory AllRowGroupsFactory() {
return SomeRowGroupsFactory(Iota(reader_->metadata()->num_row_groups()));
}
Status BoundsCheckColumn(int column) {
if (column < 0 || column >= this->num_columns()) {
return Status::Invalid("Column index out of bounds (got ", column,
", should be "
"between 0 and ",
this->num_columns() - 1, ")");
}
return Status::OK();
}
Status BoundsCheckRowGroup(int row_group) {
// row group indices check
if (row_group < 0 || row_group >= num_row_groups()) {
return Status::Invalid("Some index in row_group_indices is ", row_group,
", which is either < 0 or >= num_row_groups(",
num_row_groups(), ")");
}
return Status::OK();
}
Status BoundsCheck(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) {
for (int i : row_groups) {
RETURN_NOT_OK(BoundsCheckRowGroup(i));
}
for (int i : column_indices) {
RETURN_NOT_OK(BoundsCheckColumn(i));
}
return Status::OK();
}
std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;
Status ReadTable(const std::vector<int>& indices,
std::shared_ptr<Table>* out) override {
return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out);
}
Status GetFieldReader(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const std::vector<int>& row_groups,
std::unique_ptr<ColumnReaderImpl>* out) {
// Should be covered by GetRecordBatchReader checks but
// manifest_.schema_fields is a separate variable so be extra careful.
if (ARROW_PREDICT_FALSE(i < 0 ||
static_cast<size_t>(i) >= manifest_.schema_fields.size())) {
return Status::Invalid("Column index out of bounds (got ", i,
", should be "
"between 0 and ",
manifest_.schema_fields.size(), ")");
}
auto ctx = std::make_shared<ReaderContext>();
ctx->reader = reader_.get();
ctx->pool = pool_;
ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
ctx->filter_leaves = true;
ctx->included_leaves = included_leaves;
return GetReader(manifest_.schema_fields[i], ctx, out);
}
Status GetFieldReaders(const std::vector<int>& column_indices,
const std::vector<int>& row_groups,
std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
std::shared_ptr<::arrow::Schema>* out_schema) {
// We only need to read schema fields which have columns indicated
// in the indices vector
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
manifest_.GetFieldIndices(column_indices));
auto included_leaves = VectorToSharedSet(column_indices);
out->resize(field_indices.size());
::arrow::FieldVector out_fields(field_indices.size());
for (size_t i = 0; i < out->size(); ++i) {
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(
GetFieldReader(field_indices[i], included_leaves, row_groups, &reader));
out_fields[i] = reader->field();
out->at(i) = std::move(reader);
}
*out_schema = ::arrow::schema(std::move(out_fields), manifest_.schema_metadata);
return Status::OK();
}
Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader>* out);
Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
return GetColumn(i, AllRowGroupsFactory(), out);
}
Status GetSchema(std::shared_ptr<::arrow::Schema>* out) override {
return FromParquetSchema(reader_->metadata()->schema(), reader_properties_,
reader_->metadata()->key_value_metadata(), out);
}
Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
auto included_leaves = VectorToSharedSet(Iota(reader_->metadata()->num_columns()));
std::vector<int> row_groups = Iota(reader_->metadata()->num_row_groups());
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader));
return ReadColumn(i, row_groups, reader.get(), out);
}
Status ReadColumn(int i, const std::vector<int>& row_groups, ColumnReader* reader,
std::shared_ptr<ChunkedArray>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
// TODO(wesm): This calculation doesn't make much sense when we have repeated
// schema nodes
int64_t records_to_read = 0;
for (auto row_group : row_groups) {
// Can throw exception
records_to_read +=
reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values();
}
#ifdef ARROW_WITH_OPENTELEMETRY
std::string column_name = reader_->metadata()->schema()->Column(i)->name();
std::string phys_type =
TypeToString(reader_->metadata()->schema()->Column(i)->physical_type());
::arrow::util::tracing::Span span;
START_SPAN(span, "parquet::arrow::read_column",
{{"parquet.arrow.columnindex", i},
{"parquet.arrow.columnname", column_name},
{"parquet.arrow.physicaltype", phys_type},
{"parquet.arrow.records_to_read", records_to_read}});
#endif
return reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
}
Status ReadColumn(int i, const std::vector<int>& row_groups,
std::shared_ptr<ChunkedArray>* out) {
std::unique_ptr<ColumnReader> flat_column_reader;
RETURN_NOT_OK(GetColumn(i, SomeRowGroupsFactory(row_groups), &flat_column_reader));
return ReadColumn(i, row_groups, flat_column_reader.get(), out);
}
Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) override {
return ReadColumn(i, Iota(reader_->metadata()->num_row_groups()), out);
}
Status ReadTable(std::shared_ptr<Table>* table) override {
return ReadTable(Iota(reader_->metadata()->num_columns()), table);
}
Status ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
std::shared_ptr<Table>* table) override;
// Helper method used by ReadRowGroups - read the given row groups/columns, skipping
// bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
// alive in async contexts.
Future<std::shared_ptr<Table>> DecodeRowGroups(
std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor);
Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<Table>* table) override {
return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), table);
}
Status ReadRowGroup(int row_group_index, const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) override {
return ReadRowGroups({row_group_index}, column_indices, out);
}
Status ReadRowGroup(int i, std::shared_ptr<Table>* table) override {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
}
Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) override;
Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReader(row_group_indices,
Iota(reader_->metadata()->num_columns()), out);
}
Status GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReader(Iota(num_row_groups()),
Iota(reader_->metadata()->num_columns()), out);
}
::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
const std::vector<int> row_group_indices,
const std::vector<int> column_indices,
::arrow::internal::Executor* cpu_executor,
int64_t rows_to_readahead) override;
int num_columns() const { return reader_->metadata()->num_columns(); }
ParquetFileReader* parquet_reader() const override { return reader_.get(); }
int num_row_groups() const override { return reader_->metadata()->num_row_groups(); }
void set_use_threads(bool use_threads) override {
reader_properties_.set_use_threads(use_threads);
}
void set_batch_size(int64_t batch_size) override {
reader_properties_.set_batch_size(batch_size);
}
const ArrowReaderProperties& properties() const override { return reader_properties_; }
const SchemaManifest& manifest() const override { return manifest_; }
Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
int64_t* num_rows) override {
BEGIN_PARQUET_CATCH_EXCEPTIONS
*num_rows = ScanFileContents(columns, column_batch_size, reader_.get());
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
MemoryPool* pool_;
std::unique_ptr<ParquetFileReader> reader_;
ArrowReaderProperties reader_properties_;
SchemaManifest manifest_;
};
class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
public:
RowGroupRecordBatchReader(::arrow::RecordBatchIterator batches,
std::shared_ptr<::arrow::Schema> schema)
: batches_(std::move(batches)), schema_(std::move(schema)) {}
~RowGroupRecordBatchReader() override {}
Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
return batches_.Next().Value(out);
}
std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
private:
::arrow::Iterator<std::shared_ptr<::arrow::RecordBatch>> batches_;
std::shared_ptr<::arrow::Schema> schema_;
};
class ColumnChunkReaderImpl : public ColumnChunkReader {
public:
ColumnChunkReaderImpl(FileReaderImpl* impl, int row_group_index, int column_index)
: impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}
Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) override {
return impl_->ReadColumn(column_index_, {row_group_index_}, out);
}
private:
FileReaderImpl* impl_;
int column_index_;
int row_group_index_;
};
class RowGroupReaderImpl : public RowGroupReader {
public:
RowGroupReaderImpl(FileReaderImpl* impl, int row_group_index)
: impl_(impl), row_group_index_(row_group_index) {}
std::shared_ptr<ColumnChunkReader> Column(int column_index) override {
return std::make_shared<ColumnChunkReaderImpl>(impl_, row_group_index_, column_index);
}
Status ReadTable(const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out) override {
return impl_->ReadRowGroup(row_group_index_, column_indices, out);
}
Status ReadTable(std::shared_ptr<::arrow::Table>* out) override {
return impl_->ReadRowGroup(row_group_index_, out);
}
private:
FileReaderImpl* impl_;
int row_group_index_;
};
// ----------------------------------------------------------------------
// Column reader implementations
// Leaf reader is for primitive arrays and primitive children of nested arrays
class LeafReader : public ColumnReaderImpl {
public:
LeafReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
std::unique_ptr<FileColumnIterator> input,
::parquet::internal::LevelInfo leaf_info)
: ctx_(std::move(ctx)),
field_(std::move(field)),
input_(std::move(input)),
descr_(input_->descr()) {
record_reader_ = RecordReader::Make(
descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY);
NextRowGroup();
}
Status GetDefLevels(const int16_t** data, int64_t* length) final {
*data = record_reader_->def_levels();
*length = record_reader_->levels_position();
return Status::OK();
}
Status GetRepLevels(const int16_t** data, int64_t* length) final {
*data = record_reader_->rep_levels();
*length = record_reader_->levels_position();
return Status::OK();
}
bool IsOrHasRepeatedChild() const final { return false; }
Status LoadBatch(int64_t records_to_read) final {
BEGIN_PARQUET_CATCH_EXCEPTIONS
out_ = nullptr;
record_reader_->Reset();
// Pre-allocation gives much better performance for flat columns
record_reader_->Reserve(records_to_read);
while (records_to_read > 0) {
if (!record_reader_->HasMoreData()) {
break;
}
int64_t records_read = record_reader_->ReadRecords(records_to_read);
records_to_read -= records_read;
if (records_read == 0) {
NextRowGroup();
}
}
RETURN_NOT_OK(
TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, &out_));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
::arrow::Status BuildArray(int64_t length_upper_bound,
std::shared_ptr<::arrow::ChunkedArray>* out) final {
*out = out_;
return Status::OK();
}
const std::shared_ptr<Field> field() override { return field_; }
private:
std::shared_ptr<ChunkedArray> out_;
void NextRowGroup() {
std::unique_ptr<PageReader> page_reader = input_->NextChunk();
record_reader_->SetPageReader(std::move(page_reader));
}
std::shared_ptr<ReaderContext> ctx_;
std::shared_ptr<Field> field_;
std::unique_ptr<FileColumnIterator> input_;
const ColumnDescriptor* descr_;
std::shared_ptr<RecordReader> record_reader_;
};
// Column reader for extension arrays
class ExtensionReader : public ColumnReaderImpl {
public:
ExtensionReader(std::shared_ptr<Field> field,
std::unique_ptr<ColumnReaderImpl> storage_reader)
: field_(std::move(field)), storage_reader_(std::move(storage_reader)) {}
Status GetDefLevels(const int16_t** data, int64_t* length) override {
return storage_reader_->GetDefLevels(data, length);
}
Status GetRepLevels(const int16_t** data, int64_t* length) override {
return storage_reader_->GetRepLevels(data, length);
}
Status LoadBatch(int64_t number_of_records) final {
return storage_reader_->LoadBatch(number_of_records);
}
Status BuildArray(int64_t length_upper_bound,
std::shared_ptr<ChunkedArray>* out) override {
std::shared_ptr<ChunkedArray> storage;
RETURN_NOT_OK(storage_reader_->BuildArray(length_upper_bound, &storage));
*out = ExtensionType::WrapArray(field_->type(), storage);
return Status::OK();
}
bool IsOrHasRepeatedChild() const final {
return storage_reader_->IsOrHasRepeatedChild();
}
const std::shared_ptr<Field> field() override { return field_; }
private:
std::shared_ptr<Field> field_;
std::unique_ptr<ColumnReaderImpl> storage_reader_;
};
template <typename IndexType>
class ListReader : public ColumnReaderImpl {
public:
ListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
::parquet::internal::LevelInfo level_info,
std::unique_ptr<ColumnReaderImpl> child_reader)
: ctx_(std::move(ctx)),
field_(std::move(field)),
level_info_(level_info),
item_reader_(std::move(child_reader)) {}
Status GetDefLevels(const int16_t** data, int64_t* length) override {
return item_reader_->GetDefLevels(data, length);
}
Status GetRepLevels(const int16_t** data, int64_t* length) override {
return item_reader_->GetRepLevels(data, length);
}
bool IsOrHasRepeatedChild() const final { return true; }
Status LoadBatch(int64_t number_of_records) final {
return item_reader_->LoadBatch(number_of_records);
}
virtual ::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray(
std::shared_ptr<ArrayData> data) {
if (field_->type()->id() == ::arrow::Type::MAP) {
// Error out if data is not map-compliant instead of aborting in MakeArray below
RETURN_NOT_OK(::arrow::MapArray::ValidateChildData(data->child_data));
}
std::shared_ptr<Array> result = ::arrow::MakeArray(data);
return std::make_shared<ChunkedArray>(result);
}
Status BuildArray(int64_t length_upper_bound,
std::shared_ptr<ChunkedArray>* out) override {
const int16_t* def_levels;
const int16_t* rep_levels;
int64_t num_levels;
RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
std::shared_ptr<ResizableBuffer> validity_buffer;
::parquet::internal::ValidityBitmapInputOutput validity_io;
validity_io.values_read_upper_bound = length_upper_bound;
if (field_->nullable()) {
ARROW_ASSIGN_OR_RAISE(validity_buffer,
AllocateResizableBuffer(
bit_util::BytesForBits(length_upper_bound), ctx_->pool));
validity_io.valid_bits = validity_buffer->mutable_data();
}
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<ResizableBuffer> offsets_buffer,
AllocateResizableBuffer(
sizeof(IndexType) * std::max(int64_t{1}, length_upper_bound + 1),
ctx_->pool));
// Ensure zero initialization in case we have reached a zero length list (and
// because first entry is always zero).
IndexType* offset_data = reinterpret_cast<IndexType*>(offsets_buffer->mutable_data());
offset_data[0] = 0;
BEGIN_PARQUET_CATCH_EXCEPTIONS
::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
level_info_, &validity_io, offset_data);
END_PARQUET_CATCH_EXCEPTIONS
RETURN_NOT_OK(item_reader_->BuildArray(offset_data[validity_io.values_read], out));
// Resize to actual number of elements returned.
RETURN_NOT_OK(
offsets_buffer->Resize((validity_io.values_read + 1) * sizeof(IndexType)));
if (validity_buffer != nullptr) {
RETURN_NOT_OK(
validity_buffer->Resize(bit_util::BytesForBits(validity_io.values_read)));
validity_buffer->ZeroPadding();
}
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, ChunksToSingle(**out));
std::vector<std::shared_ptr<Buffer>> buffers{
validity_io.null_count > 0 ? validity_buffer : nullptr, offsets_buffer};
auto data = std::make_shared<ArrayData>(
field_->type(),
/*length=*/validity_io.values_read, std::move(buffers),
std::vector<std::shared_ptr<ArrayData>>{item_chunk}, validity_io.null_count);
ARROW_ASSIGN_OR_RAISE(*out, AssembleArray(std::move(data)));
return Status::OK();
}
const std::shared_ptr<Field> field() override { return field_; }
private:
std::shared_ptr<ReaderContext> ctx_;
std::shared_ptr<Field> field_;
::parquet::internal::LevelInfo level_info_;
std::unique_ptr<ColumnReaderImpl> item_reader_;
};
class PARQUET_NO_EXPORT FixedSizeListReader : public ListReader<int32_t> {
public:
FixedSizeListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
::parquet::internal::LevelInfo level_info,
std::unique_ptr<ColumnReaderImpl> child_reader)
: ListReader(std::move(ctx), std::move(field), level_info,
std::move(child_reader)) {}
::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray(
std::shared_ptr<ArrayData> data) final {
DCHECK_EQ(data->buffers.size(), 2);
DCHECK_EQ(field()->type()->id(), ::arrow::Type::FIXED_SIZE_LIST);
const auto& type = checked_cast<::arrow::FixedSizeListType&>(*field()->type());
const int32_t* offsets = reinterpret_cast<const int32_t*>(data->buffers[1]->data());
for (int x = 1; x <= data->length; x++) {
int32_t size = offsets[x] - offsets[x - 1];
if (size != type.list_size()) {
return Status::Invalid("Expected all lists to be of size=", type.list_size(),
" but index ", x, " had size=", size);
}
}
data->buffers.resize(1);
std::shared_ptr<Array> result = ::arrow::MakeArray(data);
return std::make_shared<ChunkedArray>(result);
}
};
class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
public:
explicit StructReader(std::shared_ptr<ReaderContext> ctx,
std::shared_ptr<Field> filtered_field,
::parquet::internal::LevelInfo level_info,
std::vector<std::unique_ptr<ColumnReaderImpl>> children)
: ctx_(std::move(ctx)),
filtered_field_(std::move(filtered_field)),
level_info_(level_info),
children_(std::move(children)) {
// There could be a mix of children some might be repeated some might not be.
// If possible use one that isn't since that will be guaranteed to have the least
// number of levels to reconstruct a nullable bitmap.
auto result = std::find_if(children_.begin(), children_.end(),
[](const std::unique_ptr<ColumnReaderImpl>& child) {
return !child->IsOrHasRepeatedChild();
});
if (result != children_.end()) {
def_rep_level_child_ = result->get();
has_repeated_child_ = false;
} else if (!children_.empty()) {
def_rep_level_child_ = children_.front().get();
has_repeated_child_ = true;
}
}
bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
Status LoadBatch(int64_t records_to_read) override {
for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
RETURN_NOT_OK(reader->LoadBatch(records_to_read));
}
return Status::OK();
}
Status BuildArray(int64_t length_upper_bound,
std::shared_ptr<ChunkedArray>* out) override;
Status GetDefLevels(const int16_t** data, int64_t* length) override;
Status GetRepLevels(const int16_t** data, int64_t* length) override;
const std::shared_ptr<Field> field() override { return filtered_field_; }
private:
const std::shared_ptr<ReaderContext> ctx_;
const std::shared_ptr<Field> filtered_field_;
const ::parquet::internal::LevelInfo level_info_;
const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
ColumnReaderImpl* def_rep_level_child_ = nullptr;
bool has_repeated_child_;
};
Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
*data = nullptr;
if (children_.size() == 0) {
*length = 0;
return Status::Invalid("StructReader had no children");
}
// This method should only be called when this struct or one of its parents
// are optional/repeated or it has a repeated child.
// Meaning all children must have rep/def levels associated
// with them.
RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
return Status::OK();
}
Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
*data = nullptr;
if (children_.size() == 0) {
*length = 0;
return Status::Invalid("StructReader had no childre");
}
// This method should only be called when this struct or one of its parents
// are optional/repeated or it has repeated child.
// Meaning all children must have rep/def levels associated
// with them.
RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length));
return Status::OK();
}
Status StructReader::BuildArray(int64_t length_upper_bound,
std::shared_ptr<ChunkedArray>* out) {
std::vector<std::shared_ptr<ArrayData>> children_array_data;
std::shared_ptr<ResizableBuffer> null_bitmap;
::parquet::internal::ValidityBitmapInputOutput validity_io;
validity_io.values_read_upper_bound = length_upper_bound;
// This simplifies accounting below.
validity_io.values_read = length_upper_bound;
BEGIN_PARQUET_CATCH_EXCEPTIONS
const int16_t* def_levels;
const int16_t* rep_levels;
int64_t num_levels;
if (has_repeated_child_) {
ARROW_ASSIGN_OR_RAISE(
null_bitmap,
AllocateResizableBuffer(bit_util::BytesForBits(length_upper_bound), ctx_->pool));
validity_io.valid_bits = null_bitmap->mutable_data();
RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
RETURN_NOT_OK(GetRepLevels(&rep_levels, &num_levels));
DefRepLevelsToBitmap(def_levels, rep_levels, num_levels, level_info_, &validity_io);
} else if (filtered_field_->nullable()) {
ARROW_ASSIGN_OR_RAISE(
null_bitmap,
AllocateResizableBuffer(bit_util::BytesForBits(length_upper_bound), ctx_->pool));
validity_io.valid_bits = null_bitmap->mutable_data();
RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
DefLevelsToBitmap(def_levels, num_levels, level_info_, &validity_io);
}
// Ensure all values are initialized.
if (null_bitmap) {
RETURN_NOT_OK(null_bitmap->Resize(bit_util::BytesForBits(validity_io.values_read)));
null_bitmap->ZeroPadding();
}
END_PARQUET_CATCH_EXCEPTIONS
// Gather children arrays and def levels
for (auto& child : children_) {
std::shared_ptr<ChunkedArray> field;
RETURN_NOT_OK(child->BuildArray(validity_io.values_read, &field));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> array_data, ChunksToSingle(*field));
children_array_data.push_back(std::move(array_data));
}
if (!filtered_field_->nullable() && !has_repeated_child_) {
validity_io.values_read = children_array_data.front()->length;
}
std::vector<std::shared_ptr<Buffer>> buffers{validity_io.null_count > 0 ? null_bitmap
: nullptr};
auto data =
std::make_shared<ArrayData>(filtered_field_->type(),
/*length=*/validity_io.values_read, std::move(buffers),
std::move(children_array_data));
std::shared_ptr<Array> result = ::arrow::MakeArray(data);
*out = std::make_shared<ChunkedArray>(result);
return Status::OK();
}
// ----------------------------------------------------------------------
// File reader implementation
Status GetReader(const SchemaField& field, const std::shared_ptr<Field>& arrow_field,
const std::shared_ptr<ReaderContext>& ctx,
std::unique_ptr<ColumnReaderImpl>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
auto type_id = arrow_field->type()->id();
if (type_id == ::arrow::Type::EXTENSION) {
auto storage_field = arrow_field->WithType(
checked_cast<const ExtensionType&>(*arrow_field->type()).storage_type());
RETURN_NOT_OK(GetReader(field, storage_field, ctx, out));
*out = std::make_unique<ExtensionReader>(arrow_field, std::move(*out));
return Status::OK();
}
if (field.children.size() == 0) {
if (!field.is_leaf()) {
return Status::Invalid("Parquet non-leaf node has no children");
}
if (!ctx->IncludesLeaf(field.column_index)) {
*out = nullptr;
return Status::OK();
}
std::unique_ptr<FileColumnIterator> input(
ctx->iterator_factory(field.column_index, ctx->reader));
*out = std::make_unique<LeafReader>(ctx, arrow_field, std::move(input),
field.level_info);
} else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP ||
type_id == ::arrow::Type::FIXED_SIZE_LIST ||
type_id == ::arrow::Type::LARGE_LIST) {
auto list_field = arrow_field;
auto child = &field.children[0];
std::unique_ptr<ColumnReaderImpl> child_reader;
RETURN_NOT_OK(GetReader(*child, ctx, &child_reader));
if (child_reader == nullptr) {
*out = nullptr;
return Status::OK();
}
// These two types might not be equal if there column pruning occurred.
// further down the stack.
const std::shared_ptr<DataType> reader_child_type = child_reader->field()->type();
// This should really never happen but was raised as a question on the code
// review, this should be pretty cheap check so leave it in.
if (ARROW_PREDICT_FALSE(list_field->type()->num_fields() != 1)) {
return Status::Invalid("expected exactly one child field for: ",
list_field->ToString());
}
const DataType& schema_child_type = *(list_field->type()->field(0)->type());
if (type_id == ::arrow::Type::MAP) {
if (reader_child_type->num_fields() != 2 ||
!reader_child_type->field(0)->type()->Equals(
*schema_child_type.field(0)->type())) {
// This case applies if either key or value are completed filtered
// out so we can take the type as is or the key was partially
// so keeping it as a map no longer makes sence.
list_field = list_field->WithType(::arrow::list(child_reader->field()));
} else if (!reader_child_type->field(1)->type()->Equals(
*schema_child_type.field(1)->type())) {
list_field = list_field->WithType(std::make_shared<::arrow::MapType>(
reader_child_type->field(
0), // field 0 is unchanged baed on previous if statement
reader_child_type->field(1)));
}
// Map types are list<struct<key, value>> so use ListReader
// for reconstruction.
*out = std::make_unique<ListReader<int32_t>>(ctx, list_field, field.level_info,
std::move(child_reader));
} else if (type_id == ::arrow::Type::LIST) {
if (!reader_child_type->Equals(schema_child_type)) {
list_field = list_field->WithType(::arrow::list(reader_child_type));
}
*out = std::make_unique<ListReader<int32_t>>(ctx, list_field, field.level_info,
std::move(child_reader));
} else if (type_id == ::arrow::Type::LARGE_LIST) {
if (!reader_child_type->Equals(schema_child_type)) {
list_field = list_field->WithType(::arrow::large_list(reader_child_type));
}
*out = std::make_unique<ListReader<int64_t>>(ctx, list_field, field.level_info,
std::move(child_reader));
} else if (type_id == ::arrow::Type::FIXED_SIZE_LIST) {
if (!reader_child_type->Equals(schema_child_type)) {
auto& fixed_list_type =
checked_cast<const ::arrow::FixedSizeListType&>(*list_field->type());
int32_t list_size = fixed_list_type.list_size();
list_field =
list_field->WithType(::arrow::fixed_size_list(reader_child_type, list_size));
}
*out = std::make_unique<FixedSizeListReader>(ctx, list_field, field.level_info,
std::move(child_reader));
} else {
return Status::UnknownError("Unknown list type: ", field.field->ToString());
}
} else if (type_id == ::arrow::Type::STRUCT) {
std::vector<std::shared_ptr<Field>> child_fields;
int arrow_field_idx = 0;
std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
for (const auto& child : field.children) {
std::unique_ptr<ColumnReaderImpl> child_reader;
RETURN_NOT_OK(GetReader(child, ctx, &child_reader));
if (!child_reader) {
arrow_field_idx++;
// If all children were pruned, then we do not try to read this field
continue;
}
std::shared_ptr<::arrow::Field> child_field = child.field;
const DataType& reader_child_type = *child_reader->field()->type();
const DataType& schema_child_type =
*arrow_field->type()->field(arrow_field_idx++)->type();
// These might not be equal if column pruning occurred.
if (!schema_child_type.Equals(reader_child_type)) {
child_field = child_field->WithType(child_reader->field()->type());
}
child_fields.push_back(child_field);
child_readers.emplace_back(std::move(child_reader));
}
if (child_fields.empty()) {
*out = nullptr;
return Status::OK();
}
auto filtered_field =
::arrow::field(arrow_field->name(), ::arrow::struct_(child_fields),
arrow_field->nullable(), arrow_field->metadata());
*out = std::make_unique<StructReader>(ctx, filtered_field, field.level_info,
std::move(child_readers));
} else {
return Status::Invalid("Unsupported nested type: ", arrow_field->ToString());
}
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& ctx,
std::unique_ptr<ColumnReaderImpl>* out) {
return GetReader(field, field.field, ctx, out);
}
} // namespace
Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
if (reader_properties_.pre_buffer()) {
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> batch_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema));
if (readers.empty()) {
// Just generate all batches right now; they're cheap since they have no columns.
int64_t batch_size = properties().batch_size();
auto max_sized_batch =
::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{});
::arrow::RecordBatchVector batches;
for (int row_group : row_groups) {
int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
batches.insert(batches.end(), num_rows / batch_size, max_sized_batch);
if (int64_t trailing_rows = num_rows % batch_size) {
batches.push_back(max_sized_batch->Slice(0, trailing_rows));
}
}
*out = std::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));
return Status::OK();
}
int64_t num_rows = 0;
for (int row_group : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
}
using ::arrow::RecordBatchIterator;
// NB: This lambda will be invoked outside the scope of this call to
// `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
// `this` is a non-owning pointer so we are relying on the parent FileReader outliving
// this RecordBatchReader.
::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
[readers, batch_schema, num_rows,
this]() mutable -> ::arrow::Result<RecordBatchIterator> {
::arrow::ChunkedArrayVector columns(readers.size());
// don't reserve more rows than necessary
int64_t batch_size = std::min(properties().batch_size(), num_rows);
num_rows -= batch_size;
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
reader_properties_.use_threads(), static_cast<int>(readers.size()),
[&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); }));
for (const auto& column : columns) {
if (column == nullptr || column->length() == 0) {
return ::arrow::IterationTraits<RecordBatchIterator>::End();
}
}
// auto table = ::arrow::Table::Make(batch_schema, std::move(columns));
// auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
//
// // NB: explicitly preserve table so that table_reader doesn't outlive it
// return ::arrow::MakeFunctionIterator(
// [table, table_reader] { return table_reader->Next(); });
std::vector<std::shared_ptr<Array>> arrays;
for (const auto& column : columns) {
arrays.emplace_back(column->chunk(0));
}
return ::arrow::MakeVectorIterator<std::shared_ptr<::arrow::RecordBatch>>(
{(::arrow::RecordBatch::Make(batch_schema, batch_size, std::move(arrays)))});
});
*out = std::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeFlattenIterator(std::move(batches)), std::move(batch_schema));
return Status::OK();
}
/// Given a file reader and a list of row groups, this is a generator of record
/// batch generators (where each sub-generator is the contents of a single row group).
class RowGroupGenerator {
public:
using RecordBatchGenerator =
::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
struct ReadRequest {
::arrow::Future<RecordBatchGenerator> read;
int64_t num_rows;
};
explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
::arrow::internal::Executor* cpu_executor,
std::vector<int> row_groups, std::vector<int> column_indices,
int64_t min_rows_in_flight)
: arrow_reader_(std::move(arrow_reader)),
cpu_executor_(cpu_executor),
row_groups_(std::move(row_groups)),
column_indices_(std::move(column_indices)),
min_rows_in_flight_(min_rows_in_flight),
rows_in_flight_(0),
index_(0),
readahead_index_(0) {}
::arrow::Future<RecordBatchGenerator> operator()() {
if (index_ >= row_groups_.size()) {
return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>();
}
index_++;
FillReadahead();
ReadRequest next = std::move(in_flight_reads_.front());
DCHECK(!in_flight_reads_.empty());
in_flight_reads_.pop();
rows_in_flight_ -= next.num_rows;
return next.read;
}
private:
void FillReadahead() {
if (min_rows_in_flight_ == 0) {
// No readahead, fetch the batch when it is asked for
FetchNext();
} else {
while (readahead_index_ < row_groups_.size() &&
rows_in_flight_ < min_rows_in_flight_) {
FetchNext();
}
}
}
void FetchNext() {
size_t row_group_index = readahead_index_++;
int row_group = row_groups_[row_group_index];
std::vector<int> column_indices = column_indices_;
auto reader = arrow_reader_;
int64_t num_rows =
reader->parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
rows_in_flight_ += num_rows;
::arrow::Future<RecordBatchGenerator> row_group_read;
if (!reader->properties().pre_buffer()) {
row_group_read = SubmitRead(cpu_executor_, reader, row_group, column_indices);
} else {
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
row_group_read =
ready.Then([this, reader, row_group,
column_indices = std::move(
column_indices)]() -> ::arrow::Future<RecordBatchGenerator> {
return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
});
}
in_flight_reads_.push({std::move(row_group_read), num_rows});
}
// Synchronous fallback for when pre-buffer isn't enabled.
//
// Making the Parquet reader truly asynchronous requires heavy refactoring, so the
// generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for
// async I/O without forcing readahead.
static ::arrow::Future<RecordBatchGenerator> SubmitRead(
::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
const int row_group, const std::vector<int>& column_indices) {
if (!cpu_executor) {
return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
}
// If we have an executor, then force transfer (even if I/O was complete)
return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
row_group, column_indices));
}
static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup(
::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
const int row_group, const std::vector<int>& column_indices) {
// Skips bound checks/pre-buffering, since we've done that already
const int64_t batch_size = self->properties().batch_size();
return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
.Then([batch_size](const std::shared_ptr<Table>& table)
-> ::arrow::Result<RecordBatchGenerator> {
::arrow::TableBatchReader table_reader(*table);
table_reader.set_chunksize(batch_size);
ARROW_ASSIGN_OR_RAISE(auto batches, table_reader.ToRecordBatches());
return ::arrow::MakeVectorGenerator(std::move(batches));
});
}
std::shared_ptr<FileReaderImpl> arrow_reader_;
::arrow::internal::Executor* cpu_executor_;
std::vector<int> row_groups_;
std::vector<int> column_indices_;
int64_t min_rows_in_flight_;
std::queue<ReadRequest> in_flight_reads_;
int64_t rows_in_flight_;
size_t index_;
size_t readahead_index_;
};
::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
const std::vector<int> row_group_indices,
const std::vector<int> column_indices,
::arrow::internal::Executor* cpu_executor,
int64_t rows_to_readahead) {
RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
if (rows_to_readahead < 0) {
return Status::Invalid("rows_to_readahead must be > 0");
}
if (reader_properties_.pre_buffer()) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
::arrow::AsyncGenerator<RowGroupGenerator::RecordBatchGenerator> row_group_generator =
RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader),
cpu_executor, row_group_indices, column_indices,
rows_to_readahead);
::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>> concatenated =
::arrow::MakeConcatenatedGenerator(std::move(row_group_generator));
WRAP_ASYNC_GENERATOR(std::move(concatenated));
return concatenated;
}
Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader>* out) {
RETURN_NOT_OK(BoundsCheckColumn(i));
auto ctx = std::make_shared<ReaderContext>();
ctx->reader = reader_.get();
ctx->pool = pool_;
ctx->iterator_factory = iterator_factory;
ctx->filter_leaves = false;
std::unique_ptr<ColumnReaderImpl> result;
RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result));
*out = std::move(result);
return Status::OK();
}
Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
if (reader_properties_.pre_buffer()) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
parquet_reader()->PreBuffer(row_groups, column_indices,
reader_properties_.io_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
/*cpu_executor=*/nullptr);
ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
return Status::OK();
}
Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor) {
// `self` is used solely to keep `this` alive in an async context - but we use this
// in a sync context too so use `this` over `self`
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
auto read_column = [row_groups, self, this](size_t i,
std::shared_ptr<ColumnReaderImpl> reader)
-> ::arrow::Result<std::shared_ptr<::arrow::ChunkedArray>> {
std::shared_ptr<::arrow::ChunkedArray> column;
RETURN_NOT_OK(ReadColumn(static_cast<int>(i), row_groups, reader.get(), &column));
return column;
};
auto make_table = [result_schema, row_groups, self,
this](const ::arrow::ChunkedArrayVector& columns)
-> ::arrow::Result<std::shared_ptr<Table>> {
int64_t num_rows = 0;
if (!columns.empty()) {
num_rows = columns[0]->length();
} else {
for (int i : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
}
}
auto table = Table::Make(std::move(result_schema), columns, num_rows);
RETURN_NOT_OK(table->Validate());
return table;
};
return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(),
std::move(readers), read_column,
cpu_executor)
.Then(std::move(make_table));
}
std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
return std::make_shared<RowGroupReaderImpl>(this, row_group_index);
}
// ----------------------------------------------------------------------
// Public factory functions
Status FileReader::GetRecordBatchReader(std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<RecordBatchReader> tmp;
RETURN_NOT_OK(GetRecordBatchReader(&tmp));
out->reset(tmp.release());
return Status::OK();
}
Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<RecordBatchReader> tmp;
RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, &tmp));
out->reset(tmp.release());
return Status::OK();
}
Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<RecordBatchReader> tmp;
RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, column_indices, &tmp));
out->reset(tmp.release());
return Status::OK();
}
Status FileReader::Make(::arrow::MemoryPool* pool,
std::unique_ptr<ParquetFileReader> reader,
const ArrowReaderProperties& properties,
std::unique_ptr<FileReader>* out) {
*out = std::make_unique<FileReaderImpl>(pool, std::move(reader), properties);
return static_cast<FileReaderImpl*>(out->get())->Init();
}
Status FileReader::Make(::arrow::MemoryPool* pool,
std::unique_ptr<ParquetFileReader> reader,
std::unique_ptr<FileReader>* out) {
return Make(pool, std::move(reader), default_arrow_reader_properties(), out);
}
FileReaderBuilder::FileReaderBuilder()
: pool_(::arrow::default_memory_pool()),
properties_(default_arrow_reader_properties()) {}
Status FileReaderBuilder::Open(std::shared_ptr<::arrow::io::RandomAccessFile> file,
const ReaderProperties& properties,
std::shared_ptr<FileMetaData> metadata) {
PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::Open(std::move(file), properties,
std::move(metadata)));
return Status::OK();
}
Status FileReaderBuilder::OpenFile(const std::string& path, bool memory_map,
const ReaderProperties& properties,
std::shared_ptr<FileMetaData> metadata) {
PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::OpenFile(path, memory_map, properties,
std::move(metadata)));
return Status::OK();
}
FileReaderBuilder* FileReaderBuilder::memory_pool(::arrow::MemoryPool* pool) {
pool_ = pool;
return this;
}
FileReaderBuilder* FileReaderBuilder::properties(
const ArrowReaderProperties& arg_properties) {
properties_ = arg_properties;
return this;
}
Status FileReaderBuilder::Build(std::unique_ptr<FileReader>* out) {
return FileReader::Make(pool_, std::move(raw_reader_), properties_, out);
}
Result<std::unique_ptr<FileReader>> FileReaderBuilder::Build() {
std::unique_ptr<FileReader> out;
RETURN_NOT_OK(FileReader::Make(pool_, std::move(raw_reader_), properties_, &out));
return out;
}
Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool,
std::unique_ptr<FileReader>* reader) {
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(std::move(file)));
return builder.memory_pool(pool)->Build(reader);
}
namespace internal {
Status FuzzReader(std::unique_ptr<FileReader> reader) {
auto st = Status::OK();
for (int i = 0; i < reader->num_row_groups(); ++i) {
std::shared_ptr<Table> table;
auto row_group_status = reader->ReadRowGroup(i, &table);
if (row_group_status.ok()) {
row_group_status &= table->ValidateFull();
}
st &= row_group_status;
}
return st;
}
Status FuzzReader(const uint8_t* data, int64_t size) {
auto buffer = std::make_shared<::arrow::Buffer>(data, size);
auto file = std::make_shared<::arrow::io::BufferReader>(buffer);
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(std::move(file)));
std::unique_ptr<FileReader> reader;
RETURN_NOT_OK(builder.Build(&reader));
return FuzzReader(std::move(reader));
}
} // namespace internal
} // namespace arrow
} // namespace parquet