in cpp/src/parquet/arrow/reader.cc [837:979]
Status GetReader(const SchemaField& field, const std::shared_ptr<Field>& arrow_field,
const std::shared_ptr<ReaderContext>& ctx,
std::unique_ptr<ColumnReaderImpl>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
auto type_id = arrow_field->type()->id();
if (type_id == ::arrow::Type::EXTENSION) {
auto storage_field = arrow_field->WithType(
checked_cast<const ExtensionType&>(*arrow_field->type()).storage_type());
RETURN_NOT_OK(GetReader(field, storage_field, ctx, out));
if (*out) {
auto storage_type = (*out)->field()->type();
if (!storage_type->Equals(storage_field->type())) {
return Status::Invalid(
"Due to column pruning only part of an extension's storage type was loaded. "
"An extension type cannot be created without all of its fields");
}
*out = std::make_unique<ExtensionReader>(arrow_field, std::move(*out));
}
return Status::OK();
}
if (field.children.size() == 0) {
if (!field.is_leaf()) {
return Status::Invalid("Parquet non-leaf node has no children");
}
if (!ctx->IncludesLeaf(field.column_index)) {
*out = nullptr;
return Status::OK();
}
std::unique_ptr<FileColumnIterator> input(
ctx->iterator_factory(field.column_index, ctx->reader));
*out = std::make_unique<LeafReader>(ctx, arrow_field, std::move(input),
field.level_info);
} else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP ||
type_id == ::arrow::Type::FIXED_SIZE_LIST ||
type_id == ::arrow::Type::LARGE_LIST) {
auto list_field = arrow_field;
auto child = &field.children[0];
std::unique_ptr<ColumnReaderImpl> child_reader;
RETURN_NOT_OK(GetReader(*child, ctx, &child_reader));
if (child_reader == nullptr) {
*out = nullptr;
return Status::OK();
}
// These two types might not be equal if there is column pruning occurred.
// further down the stack.
const std::shared_ptr<DataType> reader_child_type = child_reader->field()->type();
// This should really never happen but was raised as a question on the code
// review, this should be pretty cheap check so leave it in.
if (ARROW_PREDICT_FALSE(list_field->type()->num_fields() != 1)) {
return Status::Invalid("expected exactly one child field for: ",
list_field->ToString());
}
const DataType& schema_child_type = *(list_field->type()->field(0)->type());
if (type_id == ::arrow::Type::MAP) {
if (reader_child_type->num_fields() != 2 ||
!reader_child_type->field(0)->type()->Equals(
*schema_child_type.field(0)->type())) {
// This case applies if either key or value are completed filtered
// out so we can take the type as is or the key was partially
// so keeping it as a map no longer makes sence.
list_field = list_field->WithType(::arrow::list(child_reader->field()));
} else if (!reader_child_type->field(1)->type()->Equals(
*schema_child_type.field(1)->type())) {
list_field = list_field->WithType(std::make_shared<::arrow::MapType>(
reader_child_type->field(
0), // field 0 is unchanged based on previous if statement
reader_child_type->field(1)));
}
// Map types are list<struct<key, value>> so use ListReader
// for reconstruction.
*out = std::make_unique<ListReader<int32_t>>(ctx, list_field, field.level_info,
std::move(child_reader));
} else if (type_id == ::arrow::Type::LIST) {
if (!reader_child_type->Equals(schema_child_type)) {
list_field = list_field->WithType(::arrow::list(reader_child_type));
}
*out = std::make_unique<ListReader<int32_t>>(ctx, list_field, field.level_info,
std::move(child_reader));
} else if (type_id == ::arrow::Type::LARGE_LIST) {
if (!reader_child_type->Equals(schema_child_type)) {
list_field = list_field->WithType(::arrow::large_list(reader_child_type));
}
*out = std::make_unique<ListReader<int64_t>>(ctx, list_field, field.level_info,
std::move(child_reader));
} else if (type_id == ::arrow::Type::FIXED_SIZE_LIST) {
if (!reader_child_type->Equals(schema_child_type)) {
auto& fixed_list_type =
checked_cast<const ::arrow::FixedSizeListType&>(*list_field->type());
int32_t list_size = fixed_list_type.list_size();
list_field =
list_field->WithType(::arrow::fixed_size_list(reader_child_type, list_size));
}
*out = std::make_unique<FixedSizeListReader>(ctx, list_field, field.level_info,
std::move(child_reader));
} else {
return Status::UnknownError("Unknown list type: ", field.field->ToString());
}
} else if (type_id == ::arrow::Type::STRUCT) {
std::vector<std::shared_ptr<Field>> child_fields;
int arrow_field_idx = 0;
std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
for (const auto& child : field.children) {
std::unique_ptr<ColumnReaderImpl> child_reader;
RETURN_NOT_OK(GetReader(child, ctx, &child_reader));
if (!child_reader) {
arrow_field_idx++;
// If all children were pruned, then we do not try to read this field
continue;
}
std::shared_ptr<::arrow::Field> child_field = child.field;
const DataType& reader_child_type = *child_reader->field()->type();
const DataType& schema_child_type =
*arrow_field->type()->field(arrow_field_idx++)->type();
// These might not be equal if column pruning occurred.
if (!schema_child_type.Equals(reader_child_type)) {
child_field = child_field->WithType(child_reader->field()->type());
}
child_fields.push_back(child_field);
child_readers.emplace_back(std::move(child_reader));
}
if (child_fields.empty()) {
*out = nullptr;
return Status::OK();
}
auto filtered_field =
::arrow::field(arrow_field->name(), ::arrow::struct_(child_fields),
arrow_field->nullable(), arrow_field->metadata());
*out = std::make_unique<StructReader>(ctx, filtered_field, field.level_info,
std::move(child_readers));
} else {
return Status::Invalid("Unsupported nested type: ", arrow_field->ToString());
}
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}