Status GetReader()

in cpp/src/parquet/arrow/reader.cc [837:979]


Status GetReader(const SchemaField& field, const std::shared_ptr<Field>& arrow_field,
                 const std::shared_ptr<ReaderContext>& ctx,
                 std::unique_ptr<ColumnReaderImpl>* out) {
  BEGIN_PARQUET_CATCH_EXCEPTIONS

  auto type_id = arrow_field->type()->id();

  if (type_id == ::arrow::Type::EXTENSION) {
    auto storage_field = arrow_field->WithType(
        checked_cast<const ExtensionType&>(*arrow_field->type()).storage_type());
    RETURN_NOT_OK(GetReader(field, storage_field, ctx, out));
    if (*out) {
      auto storage_type = (*out)->field()->type();
      if (!storage_type->Equals(storage_field->type())) {
        return Status::Invalid(
            "Due to column pruning only part of an extension's storage type was loaded.  "
            "An extension type cannot be created without all of its fields");
      }
      *out = std::make_unique<ExtensionReader>(arrow_field, std::move(*out));
    }
    return Status::OK();
  }

  if (field.children.size() == 0) {
    if (!field.is_leaf()) {
      return Status::Invalid("Parquet non-leaf node has no children");
    }
    if (!ctx->IncludesLeaf(field.column_index)) {
      *out = nullptr;
      return Status::OK();
    }
    std::unique_ptr<FileColumnIterator> input(
        ctx->iterator_factory(field.column_index, ctx->reader));
    *out = std::make_unique<LeafReader>(ctx, arrow_field, std::move(input),
                                        field.level_info);
  } else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP ||
             type_id == ::arrow::Type::FIXED_SIZE_LIST ||
             type_id == ::arrow::Type::LARGE_LIST) {
    auto list_field = arrow_field;
    auto child = &field.children[0];
    std::unique_ptr<ColumnReaderImpl> child_reader;
    RETURN_NOT_OK(GetReader(*child, ctx, &child_reader));
    if (child_reader == nullptr) {
      *out = nullptr;
      return Status::OK();
    }

    // These two types might not be equal if there is column pruning occurred.
    // further down the stack.
    const std::shared_ptr<DataType> reader_child_type = child_reader->field()->type();
    // This should really never happen but was raised as a question on the code
    // review, this should  be pretty cheap check so leave it in.
    if (ARROW_PREDICT_FALSE(list_field->type()->num_fields() != 1)) {
      return Status::Invalid("expected exactly one child field for: ",
                             list_field->ToString());
    }
    const DataType& schema_child_type = *(list_field->type()->field(0)->type());
    if (type_id == ::arrow::Type::MAP) {
      if (reader_child_type->num_fields() != 2 ||
          !reader_child_type->field(0)->type()->Equals(
              *schema_child_type.field(0)->type())) {
        // This case applies if either key or value are completed filtered
        // out so we can take the type as is or the key was partially
        // so keeping it as a map no longer makes sence.
        list_field = list_field->WithType(::arrow::list(child_reader->field()));
      } else if (!reader_child_type->field(1)->type()->Equals(
                     *schema_child_type.field(1)->type())) {
        list_field = list_field->WithType(std::make_shared<::arrow::MapType>(
            reader_child_type->field(
                0),  // field 0 is unchanged based on previous if statement
            reader_child_type->field(1)));
      }
      // Map types are list<struct<key, value>> so use ListReader
      // for reconstruction.
      *out = std::make_unique<ListReader<int32_t>>(ctx, list_field, field.level_info,
                                                   std::move(child_reader));
    } else if (type_id == ::arrow::Type::LIST) {
      if (!reader_child_type->Equals(schema_child_type)) {
        list_field = list_field->WithType(::arrow::list(reader_child_type));
      }

      *out = std::make_unique<ListReader<int32_t>>(ctx, list_field, field.level_info,
                                                   std::move(child_reader));
    } else if (type_id == ::arrow::Type::LARGE_LIST) {
      if (!reader_child_type->Equals(schema_child_type)) {
        list_field = list_field->WithType(::arrow::large_list(reader_child_type));
      }

      *out = std::make_unique<ListReader<int64_t>>(ctx, list_field, field.level_info,
                                                   std::move(child_reader));
    } else if (type_id == ::arrow::Type::FIXED_SIZE_LIST) {
      if (!reader_child_type->Equals(schema_child_type)) {
        auto& fixed_list_type =
            checked_cast<const ::arrow::FixedSizeListType&>(*list_field->type());
        int32_t list_size = fixed_list_type.list_size();
        list_field =
            list_field->WithType(::arrow::fixed_size_list(reader_child_type, list_size));
      }

      *out = std::make_unique<FixedSizeListReader>(ctx, list_field, field.level_info,
                                                   std::move(child_reader));
    } else {
      return Status::UnknownError("Unknown list type: ", field.field->ToString());
    }
  } else if (type_id == ::arrow::Type::STRUCT) {
    std::vector<std::shared_ptr<Field>> child_fields;
    int arrow_field_idx = 0;
    std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
    for (const auto& child : field.children) {
      std::unique_ptr<ColumnReaderImpl> child_reader;
      RETURN_NOT_OK(GetReader(child, ctx, &child_reader));
      if (!child_reader) {
        arrow_field_idx++;
        // If all children were pruned, then we do not try to read this field
        continue;
      }
      std::shared_ptr<::arrow::Field> child_field = child.field;
      const DataType& reader_child_type = *child_reader->field()->type();
      const DataType& schema_child_type =
          *arrow_field->type()->field(arrow_field_idx++)->type();
      // These might not be equal if column pruning occurred.
      if (!schema_child_type.Equals(reader_child_type)) {
        child_field = child_field->WithType(child_reader->field()->type());
      }
      child_fields.push_back(child_field);
      child_readers.emplace_back(std::move(child_reader));
    }
    if (child_fields.empty()) {
      *out = nullptr;
      return Status::OK();
    }
    auto filtered_field =
        ::arrow::field(arrow_field->name(), ::arrow::struct_(child_fields),
                       arrow_field->nullable(), arrow_field->metadata());
    *out = std::make_unique<StructReader>(ctx, filtered_field, field.level_info,
                                          std::move(child_readers));
  } else {
    return Status::Invalid("Unsupported nested type: ", arrow_field->ToString());
  }
  return Status::OK();

  END_PARQUET_CATCH_EXCEPTIONS
}