// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "reader.h"

#include <algorithm>
#include <cstring>
#include <memory>
#include <unordered_set>
#include <utility>
#include <vector>

#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/extension_type.h"
#include "arrow/io/memory.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/parallel.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"
#include "Storages/ch_parquet/arrow/reader_internal.h"
#include "Storages/ch_parquet/arrow/column_reader.h"
#include "parquet/exception.h"
#include "parquet/file_reader.h"
#include "parquet/metadata.h"
#include "parquet/properties.h"
#include "parquet/schema.h"

using arrow::Array;
using arrow::ArrayData;
using arrow::BooleanArray;
using arrow::ChunkedArray;
using arrow::DataType;
using arrow::ExtensionType;
using arrow::Field;
using arrow::Future;
using arrow::Int32Array;
using arrow::ListArray;
using arrow::MemoryPool;
using arrow::RecordBatchReader;
using arrow::ResizableBuffer;
using arrow::Result;
using arrow::Status;
using arrow::StructArray;
using arrow::Table;
using arrow::TimestampArray;

using arrow::internal::checked_cast;
using arrow::internal::Iota;

// Help reduce verbosity
using ParquetReader = ch_parquet::ParquetFileReader;

using ch_parquet::internal::RecordReader;

namespace bit_util = arrow::bit_util;


using parquet::ParquetFileReader;
using parquet::ArrowReaderProperties;
using parquet::PageReader;
using parquet::ColumnDescriptor;
using parquet::Buffer;
using parquet::arrow::SchemaManifest;

namespace ch_parquet {

namespace arrow {
    using namespace parquet::arrow;
namespace {

::arrow::Result<std::shared_ptr<ArrayData>> ChunksToSingle(const ChunkedArray& chunked) {
  switch (chunked.num_chunks()) {
    case 0: {
      ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> array,
                            ::arrow::MakeArrayOfNull(chunked.type(), 0));
      return array->data();
    }
    case 1:
      return chunked.chunk(0)->data();
    default:
      // ARROW-3762(wesm): If item reader yields a chunked array, we reject as
      // this is not yet implemented
      return Status::NotImplemented(
          "Nested data conversions not implemented for chunked array outputs");
  }
}

}  // namespace

class ColumnReaderImpl : public ColumnReader {
 public:
  virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0;
  virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0;
  virtual const std::shared_ptr<Field> field() = 0;

  ::arrow::Status NextBatch(int64_t batch_size,
                            std::shared_ptr<::arrow::ChunkedArray>* out) final {
    RETURN_NOT_OK(LoadBatch(batch_size));
    RETURN_NOT_OK(BuildArray(batch_size, out));
    for (int x = 0; x < (*out)->num_chunks(); x++) {
      RETURN_NOT_OK((*out)->chunk(x)->Validate());
    }
    return Status::OK();
  }

  virtual ::arrow::Status LoadBatch(int64_t num_records) = 0;

  virtual ::arrow::Status BuildArray(int64_t length_upper_bound,
                                     std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
  virtual bool IsOrHasRepeatedChild() const = 0;
};

namespace {

std::shared_ptr<std::unordered_set<int>> VectorToSharedSet(
    const std::vector<int>& values) {
  std::shared_ptr<std::unordered_set<int>> result(new std::unordered_set<int>());
  result->insert(values.begin(), values.end());
  return result;
}

// Forward declaration
Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& context,
                 std::unique_ptr<ColumnReaderImpl>* out);

// ----------------------------------------------------------------------
// FileReaderImpl forward declaration

class FileReaderImpl : public FileReader {
 public:
  FileReaderImpl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
                 ArrowReaderProperties properties)
      : pool_(pool),
        reader_(std::move(reader)),
        reader_properties_(std::move(properties)) {}

  Status Init() {
    return SchemaManifest::Make(reader_->metadata()->schema(),
                                reader_->metadata()->key_value_metadata(),
                                reader_properties_, &manifest_);
  }

  FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups) {
    return [row_groups](int i, ParquetFileReader* reader) {
      return new FileColumnIterator(i, reader, row_groups);
    };
  }

  FileColumnIteratorFactory AllRowGroupsFactory() {
    return SomeRowGroupsFactory(Iota(reader_->metadata()->num_row_groups()));
  }

  Status BoundsCheckColumn(int column) {
    if (column < 0 || column >= this->num_columns()) {
      return Status::Invalid("Column index out of bounds (got ", column,
                             ", should be "
                             "between 0 and ",
                             this->num_columns() - 1, ")");
    }
    return Status::OK();
  }

  Status BoundsCheckRowGroup(int row_group) {
    // row group indices check
    if (row_group < 0 || row_group >= num_row_groups()) {
      return Status::Invalid("Some index in row_group_indices is ", row_group,
                             ", which is either < 0 or >= num_row_groups(",
                             num_row_groups(), ")");
    }
    return Status::OK();
  }

  Status BoundsCheck(const std::vector<int>& row_groups,
                     const std::vector<int>& column_indices) {
    for (int i : row_groups) {
      RETURN_NOT_OK(BoundsCheckRowGroup(i));
    }
    for (int i : column_indices) {
      RETURN_NOT_OK(BoundsCheckColumn(i));
    }
    return Status::OK();
  }

  std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;

  Status ReadTable(const std::vector<int>& indices,
                   std::shared_ptr<Table>* out) override {
    return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out);
  }

  Status GetFieldReader(int i,
                        const std::shared_ptr<std::unordered_set<int>>& included_leaves,
                        const std::vector<int>& row_groups,
                        std::unique_ptr<ColumnReaderImpl>* out) {
    // Should be covered by GetRecordBatchReader checks but
    // manifest_.schema_fields is a separate variable so be extra careful.
    if (ARROW_PREDICT_FALSE(i < 0 ||
                            static_cast<size_t>(i) >= manifest_.schema_fields.size())) {
      return Status::Invalid("Column index out of bounds (got ", i,
                             ", should be "
                             "between 0 and ",
                             manifest_.schema_fields.size(), ")");
    }
    auto ctx = std::make_shared<ReaderContext>();
    ctx->reader = reader_.get();
    ctx->pool = pool_;
    ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
    ctx->filter_leaves = true;
    ctx->included_leaves = included_leaves;
    return GetReader(manifest_.schema_fields[i], ctx, out);
  }

  Status GetFieldReaders(const std::vector<int>& column_indices,
                         const std::vector<int>& row_groups,
                         std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
                         std::shared_ptr<::arrow::Schema>* out_schema) {
    // We only need to read schema fields which have columns indicated
    // in the indices vector
    ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
                          manifest_.GetFieldIndices(column_indices));

    auto included_leaves = VectorToSharedSet(column_indices);

    out->resize(field_indices.size());
    ::arrow::FieldVector out_fields(field_indices.size());
    for (size_t i = 0; i < out->size(); ++i) {
      std::unique_ptr<ColumnReaderImpl> reader;
      RETURN_NOT_OK(
          GetFieldReader(field_indices[i], included_leaves, row_groups, &reader));

      out_fields[i] = reader->field();
      out->at(i) = std::move(reader);
    }

    *out_schema = ::arrow::schema(std::move(out_fields), manifest_.schema_metadata);
    return Status::OK();
  }

  Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
                   std::unique_ptr<ColumnReader>* out);

  Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
    return GetColumn(i, AllRowGroupsFactory(), out);
  }

  Status GetSchema(std::shared_ptr<::arrow::Schema>* out) override {
    return FromParquetSchema(reader_->metadata()->schema(), reader_properties_,
                             reader_->metadata()->key_value_metadata(), out);
  }

  Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
    auto included_leaves = VectorToSharedSet(Iota(reader_->metadata()->num_columns()));
    std::vector<int> row_groups = Iota(reader_->metadata()->num_row_groups());

    std::unique_ptr<ColumnReaderImpl> reader;
    RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader));

    return ReadColumn(i, row_groups, reader.get(), out);
  }

  Status ReadColumn(int i, const std::vector<int>& row_groups, ColumnReader* reader,
                    std::shared_ptr<ChunkedArray>* out) {
    BEGIN_PARQUET_CATCH_EXCEPTIONS
    // TODO(wesm): This calculation doesn't make much sense when we have repeated
    // schema nodes
    int64_t records_to_read = 0;
    for (auto row_group : row_groups) {
      // Can throw exception
      records_to_read +=
          reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values();
    }
#ifdef ARROW_WITH_OPENTELEMETRY
    std::string column_name = reader_->metadata()->schema()->Column(i)->name();
    std::string phys_type =
        TypeToString(reader_->metadata()->schema()->Column(i)->physical_type());
    ::arrow::util::tracing::Span span;
    START_SPAN(span, "parquet::arrow::read_column",
               {{"parquet.arrow.columnindex", i},
                {"parquet.arrow.columnname", column_name},
                {"parquet.arrow.physicaltype", phys_type},
                {"parquet.arrow.records_to_read", records_to_read}});
#endif
    return reader->NextBatch(records_to_read, out);
    END_PARQUET_CATCH_EXCEPTIONS
  }

  Status ReadColumn(int i, const std::vector<int>& row_groups,
                    std::shared_ptr<ChunkedArray>* out) {
    std::unique_ptr<ColumnReader> flat_column_reader;
    RETURN_NOT_OK(GetColumn(i, SomeRowGroupsFactory(row_groups), &flat_column_reader));
    return ReadColumn(i, row_groups, flat_column_reader.get(), out);
  }

  Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) override {
    return ReadColumn(i, Iota(reader_->metadata()->num_row_groups()), out);
  }

  Status ReadTable(std::shared_ptr<Table>* table) override {
    return ReadTable(Iota(reader_->metadata()->num_columns()), table);
  }

  Status ReadRowGroups(const std::vector<int>& row_groups,
                       const std::vector<int>& indices,
                       std::shared_ptr<Table>* table) override;

  // Helper method used by ReadRowGroups - read the given row groups/columns, skipping
  // bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
  // alive in async contexts.
  Future<std::shared_ptr<Table>> DecodeRowGroups(
      std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
      const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor);

  Status ReadRowGroups(const std::vector<int>& row_groups,
                       std::shared_ptr<Table>* table) override {
    return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), table);
  }

  Status ReadRowGroup(int row_group_index, const std::vector<int>& column_indices,
                      std::shared_ptr<Table>* out) override {
    return ReadRowGroups({row_group_index}, column_indices, out);
  }

  Status ReadRowGroup(int i, std::shared_ptr<Table>* table) override {
    return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
  }

  Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
                              const std::vector<int>& column_indices,
                              std::unique_ptr<RecordBatchReader>* out) override;

  Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
                              std::unique_ptr<RecordBatchReader>* out) override {
    return GetRecordBatchReader(row_group_indices,
                                Iota(reader_->metadata()->num_columns()), out);
  }

  Status GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out) override {
    return GetRecordBatchReader(Iota(num_row_groups()),
                                Iota(reader_->metadata()->num_columns()), out);
  }

  ::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
  GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
                          const std::vector<int> row_group_indices,
                          const std::vector<int> column_indices,
                          ::arrow::internal::Executor* cpu_executor,
                          int64_t rows_to_readahead) override;

  int num_columns() const { return reader_->metadata()->num_columns(); }

  ParquetFileReader* parquet_reader() const override { return reader_.get(); }

  int num_row_groups() const override { return reader_->metadata()->num_row_groups(); }

  void set_use_threads(bool use_threads) override {
    reader_properties_.set_use_threads(use_threads);
  }

  void set_batch_size(int64_t batch_size) override {
    reader_properties_.set_batch_size(batch_size);
  }

  const ArrowReaderProperties& properties() const override { return reader_properties_; }

  const SchemaManifest& manifest() const override { return manifest_; }

  Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
                      int64_t* num_rows) override {
    BEGIN_PARQUET_CATCH_EXCEPTIONS
    *num_rows = ScanFileContents(columns, column_batch_size, reader_.get());
    return Status::OK();
    END_PARQUET_CATCH_EXCEPTIONS
  }

  MemoryPool* pool_;
  std::unique_ptr<ParquetFileReader> reader_;
  ArrowReaderProperties reader_properties_;

  SchemaManifest manifest_;
};

class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
 public:
  RowGroupRecordBatchReader(::arrow::RecordBatchIterator batches,
                            std::shared_ptr<::arrow::Schema> schema)
      : batches_(std::move(batches)), schema_(std::move(schema)) {}

  ~RowGroupRecordBatchReader() override {}

  Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
    return batches_.Next().Value(out);
  }

  std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }

 private:
  ::arrow::Iterator<std::shared_ptr<::arrow::RecordBatch>> batches_;
  std::shared_ptr<::arrow::Schema> schema_;
};

class ColumnChunkReaderImpl : public ColumnChunkReader {
 public:
  ColumnChunkReaderImpl(FileReaderImpl* impl, int row_group_index, int column_index)
      : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}

  Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) override {
    return impl_->ReadColumn(column_index_, {row_group_index_}, out);
  }

 private:
  FileReaderImpl* impl_;
  int column_index_;
  int row_group_index_;
};

class RowGroupReaderImpl : public RowGroupReader {
 public:
  RowGroupReaderImpl(FileReaderImpl* impl, int row_group_index)
      : impl_(impl), row_group_index_(row_group_index) {}

  std::shared_ptr<ColumnChunkReader> Column(int column_index) override {
    return std::make_shared<ColumnChunkReaderImpl>(impl_, row_group_index_, column_index);
  }

  Status ReadTable(const std::vector<int>& column_indices,
                   std::shared_ptr<::arrow::Table>* out) override {
    return impl_->ReadRowGroup(row_group_index_, column_indices, out);
  }

  Status ReadTable(std::shared_ptr<::arrow::Table>* out) override {
    return impl_->ReadRowGroup(row_group_index_, out);
  }

 private:
  FileReaderImpl* impl_;
  int row_group_index_;
};

// ----------------------------------------------------------------------
// Column reader implementations

// Leaf reader is for primitive arrays and primitive children of nested arrays
class LeafReader : public ColumnReaderImpl {
 public:
  LeafReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
             std::unique_ptr<FileColumnIterator> input,
             ::parquet::internal::LevelInfo leaf_info)
      : ctx_(std::move(ctx)),
        field_(std::move(field)),
        input_(std::move(input)),
        descr_(input_->descr()) {
    record_reader_ = RecordReader::Make(
        descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY);
    NextRowGroup();
  }

  Status GetDefLevels(const int16_t** data, int64_t* length) final {
    *data = record_reader_->def_levels();
    *length = record_reader_->levels_position();
    return Status::OK();
  }

  Status GetRepLevels(const int16_t** data, int64_t* length) final {
    *data = record_reader_->rep_levels();
    *length = record_reader_->levels_position();
    return Status::OK();
  }

  bool IsOrHasRepeatedChild() const final { return false; }

  Status LoadBatch(int64_t records_to_read) final {
    BEGIN_PARQUET_CATCH_EXCEPTIONS
    out_ = nullptr;
    record_reader_->Reset();
    // Pre-allocation gives much better performance for flat columns
    record_reader_->Reserve(records_to_read);
    while (records_to_read > 0) {
      if (!record_reader_->HasMoreData()) {
        break;
      }
      int64_t records_read = record_reader_->ReadRecords(records_to_read);
      records_to_read -= records_read;
      if (records_read == 0) {
        NextRowGroup();
      }
    }
    RETURN_NOT_OK(
        TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, &out_));
    return Status::OK();
    END_PARQUET_CATCH_EXCEPTIONS
  }

  ::arrow::Status BuildArray(int64_t length_upper_bound,
                             std::shared_ptr<::arrow::ChunkedArray>* out) final {
    *out = out_;
    return Status::OK();
  }

  const std::shared_ptr<Field> field() override { return field_; }

 private:
  std::shared_ptr<ChunkedArray> out_;
  void NextRowGroup() {
    std::unique_ptr<PageReader> page_reader = input_->NextChunk();
    record_reader_->SetPageReader(std::move(page_reader));
  }

  std::shared_ptr<ReaderContext> ctx_;
  std::shared_ptr<Field> field_;
  std::unique_ptr<FileColumnIterator> input_;
  const ColumnDescriptor* descr_;
  std::shared_ptr<RecordReader> record_reader_;
};

// Column reader for extension arrays
class ExtensionReader : public ColumnReaderImpl {
 public:
  ExtensionReader(std::shared_ptr<Field> field,
                  std::unique_ptr<ColumnReaderImpl> storage_reader)
      : field_(std::move(field)), storage_reader_(std::move(storage_reader)) {}

  Status GetDefLevels(const int16_t** data, int64_t* length) override {
    return storage_reader_->GetDefLevels(data, length);
  }

  Status GetRepLevels(const int16_t** data, int64_t* length) override {
    return storage_reader_->GetRepLevels(data, length);
  }

  Status LoadBatch(int64_t number_of_records) final {
    return storage_reader_->LoadBatch(number_of_records);
  }

  Status BuildArray(int64_t length_upper_bound,
                    std::shared_ptr<ChunkedArray>* out) override {
    std::shared_ptr<ChunkedArray> storage;
    RETURN_NOT_OK(storage_reader_->BuildArray(length_upper_bound, &storage));
    *out = ExtensionType::WrapArray(field_->type(), storage);
    return Status::OK();
  }

  bool IsOrHasRepeatedChild() const final {
    return storage_reader_->IsOrHasRepeatedChild();
  }

  const std::shared_ptr<Field> field() override { return field_; }

 private:
  std::shared_ptr<Field> field_;
  std::unique_ptr<ColumnReaderImpl> storage_reader_;
};

template <typename IndexType>
class ListReader : public ColumnReaderImpl {
 public:
  ListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
             ::parquet::internal::LevelInfo level_info,
             std::unique_ptr<ColumnReaderImpl> child_reader)
      : ctx_(std::move(ctx)),
        field_(std::move(field)),
        level_info_(level_info),
        item_reader_(std::move(child_reader)) {}

  Status GetDefLevels(const int16_t** data, int64_t* length) override {
    return item_reader_->GetDefLevels(data, length);
  }

  Status GetRepLevels(const int16_t** data, int64_t* length) override {
    return item_reader_->GetRepLevels(data, length);
  }

  bool IsOrHasRepeatedChild() const final { return true; }

  Status LoadBatch(int64_t number_of_records) final {
    return item_reader_->LoadBatch(number_of_records);
  }

  virtual ::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray(
      std::shared_ptr<ArrayData> data) {
    if (field_->type()->id() == ::arrow::Type::MAP) {
      // Error out if data is not map-compliant instead of aborting in MakeArray below
      RETURN_NOT_OK(::arrow::MapArray::ValidateChildData(data->child_data));
    }
    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
    return std::make_shared<ChunkedArray>(result);
  }

  Status BuildArray(int64_t length_upper_bound,
                    std::shared_ptr<ChunkedArray>* out) override {
    const int16_t* def_levels;
    const int16_t* rep_levels;
    int64_t num_levels;
    RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
    RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));

    std::shared_ptr<ResizableBuffer> validity_buffer;
    ::parquet::internal::ValidityBitmapInputOutput validity_io;
    validity_io.values_read_upper_bound = length_upper_bound;
    if (field_->nullable()) {
      ARROW_ASSIGN_OR_RAISE(validity_buffer,
                            AllocateResizableBuffer(
                                bit_util::BytesForBits(length_upper_bound), ctx_->pool));
      validity_io.valid_bits = validity_buffer->mutable_data();
    }
    ARROW_ASSIGN_OR_RAISE(
        std::shared_ptr<ResizableBuffer> offsets_buffer,
        AllocateResizableBuffer(
            sizeof(IndexType) * std::max(int64_t{1}, length_upper_bound + 1),
            ctx_->pool));
    // Ensure zero initialization in case we have reached a zero length list (and
    // because first entry is always zero).
    IndexType* offset_data = reinterpret_cast<IndexType*>(offsets_buffer->mutable_data());
    offset_data[0] = 0;
    BEGIN_PARQUET_CATCH_EXCEPTIONS
    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
                                            level_info_, &validity_io, offset_data);
    END_PARQUET_CATCH_EXCEPTIONS

    RETURN_NOT_OK(item_reader_->BuildArray(offset_data[validity_io.values_read], out));

    // Resize to actual number of elements returned.
    RETURN_NOT_OK(
        offsets_buffer->Resize((validity_io.values_read + 1) * sizeof(IndexType)));
    if (validity_buffer != nullptr) {
      RETURN_NOT_OK(
          validity_buffer->Resize(bit_util::BytesForBits(validity_io.values_read)));
      validity_buffer->ZeroPadding();
    }
    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, ChunksToSingle(**out));

    std::vector<std::shared_ptr<Buffer>> buffers{
        validity_io.null_count > 0 ? validity_buffer : nullptr, offsets_buffer};
    auto data = std::make_shared<ArrayData>(
        field_->type(),
        /*length=*/validity_io.values_read, std::move(buffers),
        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, validity_io.null_count);

    ARROW_ASSIGN_OR_RAISE(*out, AssembleArray(std::move(data)));
    return Status::OK();
  }

  const std::shared_ptr<Field> field() override { return field_; }

 private:
  std::shared_ptr<ReaderContext> ctx_;
  std::shared_ptr<Field> field_;
  ::parquet::internal::LevelInfo level_info_;
  std::unique_ptr<ColumnReaderImpl> item_reader_;
};

class PARQUET_NO_EXPORT FixedSizeListReader : public ListReader<int32_t> {
 public:
  FixedSizeListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
                      ::parquet::internal::LevelInfo level_info,
                      std::unique_ptr<ColumnReaderImpl> child_reader)
      : ListReader(std::move(ctx), std::move(field), level_info,
                   std::move(child_reader)) {}
  ::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray(
      std::shared_ptr<ArrayData> data) final {
    DCHECK_EQ(data->buffers.size(), 2);
    DCHECK_EQ(field()->type()->id(), ::arrow::Type::FIXED_SIZE_LIST);
    const auto& type = checked_cast<::arrow::FixedSizeListType&>(*field()->type());
    const int32_t* offsets = reinterpret_cast<const int32_t*>(data->buffers[1]->data());
    for (int x = 1; x <= data->length; x++) {
      int32_t size = offsets[x] - offsets[x - 1];
      if (size != type.list_size()) {
        return Status::Invalid("Expected all lists to be of size=", type.list_size(),
                               " but index ", x, " had size=", size);
      }
    }
    data->buffers.resize(1);
    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
    return std::make_shared<ChunkedArray>(result);
  }
};

class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
 public:
  explicit StructReader(std::shared_ptr<ReaderContext> ctx,
                        std::shared_ptr<Field> filtered_field,
                        ::parquet::internal::LevelInfo level_info,
                        std::vector<std::unique_ptr<ColumnReaderImpl>> children)
      : ctx_(std::move(ctx)),
        filtered_field_(std::move(filtered_field)),
        level_info_(level_info),
        children_(std::move(children)) {
    // There could be a mix of children some might be repeated some might not be.
    // If possible use one that isn't since that will be guaranteed to have the least
    // number of levels to reconstruct a nullable bitmap.
    auto result = std::find_if(children_.begin(), children_.end(),
                               [](const std::unique_ptr<ColumnReaderImpl>& child) {
                                 return !child->IsOrHasRepeatedChild();
                               });
    if (result != children_.end()) {
      def_rep_level_child_ = result->get();
      has_repeated_child_ = false;
    } else if (!children_.empty()) {
      def_rep_level_child_ = children_.front().get();
      has_repeated_child_ = true;
    }
  }

  bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }

  Status LoadBatch(int64_t records_to_read) override {
    for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
      RETURN_NOT_OK(reader->LoadBatch(records_to_read));
    }
    return Status::OK();
  }
  Status BuildArray(int64_t length_upper_bound,
                    std::shared_ptr<ChunkedArray>* out) override;
  Status GetDefLevels(const int16_t** data, int64_t* length) override;
  Status GetRepLevels(const int16_t** data, int64_t* length) override;
  const std::shared_ptr<Field> field() override { return filtered_field_; }

 private:
  const std::shared_ptr<ReaderContext> ctx_;
  const std::shared_ptr<Field> filtered_field_;
  const ::parquet::internal::LevelInfo level_info_;
  const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
  ColumnReaderImpl* def_rep_level_child_ = nullptr;
  bool has_repeated_child_;
};

Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
  *data = nullptr;
  if (children_.size() == 0) {
    *length = 0;
    return Status::Invalid("StructReader had no children");
  }

  // This method should only be called when this struct or one of its parents
  // are optional/repeated or it has a repeated child.
  // Meaning all children must have rep/def levels associated
  // with them.
  RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
  return Status::OK();
}

Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
  *data = nullptr;
  if (children_.size() == 0) {
    *length = 0;
    return Status::Invalid("StructReader had no childre");
  }

  // This method should only be called when this struct or one of its parents
  // are optional/repeated or it has repeated child.
  // Meaning all children must have rep/def levels associated
  // with them.
  RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length));
  return Status::OK();
}

Status StructReader::BuildArray(int64_t length_upper_bound,
                                std::shared_ptr<ChunkedArray>* out) {
  std::vector<std::shared_ptr<ArrayData>> children_array_data;
  std::shared_ptr<ResizableBuffer> null_bitmap;

  ::parquet::internal::ValidityBitmapInputOutput validity_io;
  validity_io.values_read_upper_bound = length_upper_bound;
  // This simplifies accounting below.
  validity_io.values_read = length_upper_bound;

  BEGIN_PARQUET_CATCH_EXCEPTIONS
  const int16_t* def_levels;
  const int16_t* rep_levels;
  int64_t num_levels;

  if (has_repeated_child_) {
    ARROW_ASSIGN_OR_RAISE(
        null_bitmap,
        AllocateResizableBuffer(bit_util::BytesForBits(length_upper_bound), ctx_->pool));
    validity_io.valid_bits = null_bitmap->mutable_data();
    RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
    RETURN_NOT_OK(GetRepLevels(&rep_levels, &num_levels));
    DefRepLevelsToBitmap(def_levels, rep_levels, num_levels, level_info_, &validity_io);
  } else if (filtered_field_->nullable()) {
    ARROW_ASSIGN_OR_RAISE(
        null_bitmap,
        AllocateResizableBuffer(bit_util::BytesForBits(length_upper_bound), ctx_->pool));
    validity_io.valid_bits = null_bitmap->mutable_data();
    RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
    DefLevelsToBitmap(def_levels, num_levels, level_info_, &validity_io);
  }

  // Ensure all values are initialized.
  if (null_bitmap) {
    RETURN_NOT_OK(null_bitmap->Resize(bit_util::BytesForBits(validity_io.values_read)));
    null_bitmap->ZeroPadding();
  }

  END_PARQUET_CATCH_EXCEPTIONS
  // Gather children arrays and def levels
  for (auto& child : children_) {
    std::shared_ptr<ChunkedArray> field;
    RETURN_NOT_OK(child->BuildArray(validity_io.values_read, &field));
    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> array_data, ChunksToSingle(*field));
    children_array_data.push_back(std::move(array_data));
  }

  if (!filtered_field_->nullable() && !has_repeated_child_) {
    validity_io.values_read = children_array_data.front()->length;
  }

  std::vector<std::shared_ptr<Buffer>> buffers{validity_io.null_count > 0 ? null_bitmap
                                                                          : nullptr};
  auto data =
      std::make_shared<ArrayData>(filtered_field_->type(),
                                  /*length=*/validity_io.values_read, std::move(buffers),
                                  std::move(children_array_data));
  std::shared_ptr<Array> result = ::arrow::MakeArray(data);

  *out = std::make_shared<ChunkedArray>(result);
  return Status::OK();
}

// ----------------------------------------------------------------------
// File reader implementation

Status GetReader(const SchemaField& field, const std::shared_ptr<Field>& arrow_field,
                 const std::shared_ptr<ReaderContext>& ctx,
                 std::unique_ptr<ColumnReaderImpl>* out) {
  BEGIN_PARQUET_CATCH_EXCEPTIONS

  auto type_id = arrow_field->type()->id();

  if (type_id == ::arrow::Type::EXTENSION) {
    auto storage_field = arrow_field->WithType(
        checked_cast<const ExtensionType&>(*arrow_field->type()).storage_type());
    RETURN_NOT_OK(GetReader(field, storage_field, ctx, out));
    *out = std::make_unique<ExtensionReader>(arrow_field, std::move(*out));
    return Status::OK();
  }

  if (field.children.size() == 0) {
    if (!field.is_leaf()) {
      return Status::Invalid("Parquet non-leaf node has no children");
    }
    if (!ctx->IncludesLeaf(field.column_index)) {
      *out = nullptr;
      return Status::OK();
    }
    std::unique_ptr<FileColumnIterator> input(
        ctx->iterator_factory(field.column_index, ctx->reader));
    *out = std::make_unique<LeafReader>(ctx, arrow_field, std::move(input),
                                        field.level_info);
  } else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP ||
             type_id == ::arrow::Type::FIXED_SIZE_LIST ||
             type_id == ::arrow::Type::LARGE_LIST) {
    auto list_field = arrow_field;
    auto child = &field.children[0];
    std::unique_ptr<ColumnReaderImpl> child_reader;
    RETURN_NOT_OK(GetReader(*child, ctx, &child_reader));
    if (child_reader == nullptr) {
      *out = nullptr;
      return Status::OK();
    }

    // These two types might not be equal if there column pruning occurred.
    // further down the stack.
    const std::shared_ptr<DataType> reader_child_type = child_reader->field()->type();
    // This should really never happen but was raised as a question on the code
    // review, this should  be pretty cheap check so leave it in.
    if (ARROW_PREDICT_FALSE(list_field->type()->num_fields() != 1)) {
      return Status::Invalid("expected exactly one child field for: ",
                             list_field->ToString());
    }
    const DataType& schema_child_type = *(list_field->type()->field(0)->type());
    if (type_id == ::arrow::Type::MAP) {
      if (reader_child_type->num_fields() != 2 ||
          !reader_child_type->field(0)->type()->Equals(
              *schema_child_type.field(0)->type())) {
        // This case applies if either key or value are completed filtered
        // out so we can take the type as is or the key was partially
        // so keeping it as a map no longer makes sence.
        list_field = list_field->WithType(::arrow::list(child_reader->field()));
      } else if (!reader_child_type->field(1)->type()->Equals(
                     *schema_child_type.field(1)->type())) {
        list_field = list_field->WithType(std::make_shared<::arrow::MapType>(
            reader_child_type->field(
                0),  // field 0 is unchanged baed on previous if statement
            reader_child_type->field(1)));
      }
      // Map types are list<struct<key, value>> so use ListReader
      // for reconstruction.
      *out = std::make_unique<ListReader<int32_t>>(ctx, list_field, field.level_info,
                                                   std::move(child_reader));
    } else if (type_id == ::arrow::Type::LIST) {
      if (!reader_child_type->Equals(schema_child_type)) {
        list_field = list_field->WithType(::arrow::list(reader_child_type));
      }

      *out = std::make_unique<ListReader<int32_t>>(ctx, list_field, field.level_info,
                                                   std::move(child_reader));
    } else if (type_id == ::arrow::Type::LARGE_LIST) {
      if (!reader_child_type->Equals(schema_child_type)) {
        list_field = list_field->WithType(::arrow::large_list(reader_child_type));
      }

      *out = std::make_unique<ListReader<int64_t>>(ctx, list_field, field.level_info,
                                                   std::move(child_reader));
    } else if (type_id == ::arrow::Type::FIXED_SIZE_LIST) {
      if (!reader_child_type->Equals(schema_child_type)) {
        auto& fixed_list_type =
            checked_cast<const ::arrow::FixedSizeListType&>(*list_field->type());
        int32_t list_size = fixed_list_type.list_size();
        list_field =
            list_field->WithType(::arrow::fixed_size_list(reader_child_type, list_size));
      }

      *out = std::make_unique<FixedSizeListReader>(ctx, list_field, field.level_info,
                                                   std::move(child_reader));
    } else {
      return Status::UnknownError("Unknown list type: ", field.field->ToString());
    }
  } else if (type_id == ::arrow::Type::STRUCT) {
    std::vector<std::shared_ptr<Field>> child_fields;
    int arrow_field_idx = 0;
    std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
    for (const auto& child : field.children) {
      std::unique_ptr<ColumnReaderImpl> child_reader;
      RETURN_NOT_OK(GetReader(child, ctx, &child_reader));
      if (!child_reader) {
        arrow_field_idx++;
        // If all children were pruned, then we do not try to read this field
        continue;
      }
      std::shared_ptr<::arrow::Field> child_field = child.field;
      const DataType& reader_child_type = *child_reader->field()->type();
      const DataType& schema_child_type =
          *arrow_field->type()->field(arrow_field_idx++)->type();
      // These might not be equal if column pruning occurred.
      if (!schema_child_type.Equals(reader_child_type)) {
        child_field = child_field->WithType(child_reader->field()->type());
      }
      child_fields.push_back(child_field);
      child_readers.emplace_back(std::move(child_reader));
    }
    if (child_fields.empty()) {
      *out = nullptr;
      return Status::OK();
    }
    auto filtered_field =
        ::arrow::field(arrow_field->name(), ::arrow::struct_(child_fields),
                       arrow_field->nullable(), arrow_field->metadata());
    *out = std::make_unique<StructReader>(ctx, filtered_field, field.level_info,
                                          std::move(child_readers));
  } else {
    return Status::Invalid("Unsupported nested type: ", arrow_field->ToString());
  }
  return Status::OK();

  END_PARQUET_CATCH_EXCEPTIONS
}

Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& ctx,
                 std::unique_ptr<ColumnReaderImpl>* out) {
  return GetReader(field, field.field, ctx, out);
}

}  // namespace

Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
                                            const std::vector<int>& column_indices,
                                            std::unique_ptr<RecordBatchReader>* out) {
  RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

  if (reader_properties_.pre_buffer()) {
    // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
    BEGIN_PARQUET_CATCH_EXCEPTIONS
    reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(),
                       reader_properties_.cache_options());
    END_PARQUET_CATCH_EXCEPTIONS
  }

  std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
  std::shared_ptr<::arrow::Schema> batch_schema;
  RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema));

  if (readers.empty()) {
    // Just generate all batches right now; they're cheap since they have no columns.
    int64_t batch_size = properties().batch_size();
    auto max_sized_batch =
        ::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{});

    ::arrow::RecordBatchVector batches;

    for (int row_group : row_groups) {
      int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows();

      batches.insert(batches.end(), num_rows / batch_size, max_sized_batch);

      if (int64_t trailing_rows = num_rows % batch_size) {
        batches.push_back(max_sized_batch->Slice(0, trailing_rows));
      }
    }

    *out = std::make_unique<RowGroupRecordBatchReader>(
        ::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));

    return Status::OK();
  }

  int64_t num_rows = 0;
  for (int row_group : row_groups) {
    num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
  }

  using ::arrow::RecordBatchIterator;

  // NB: This lambda will be invoked outside the scope of this call to
  // `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
  // `this` is a non-owning pointer so we are relying on the parent FileReader outliving
  // this RecordBatchReader.
  ::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
      [readers, batch_schema, num_rows,
       this]() mutable -> ::arrow::Result<RecordBatchIterator> {
        ::arrow::ChunkedArrayVector columns(readers.size());

        // don't reserve more rows than necessary
        int64_t batch_size = std::min(properties().batch_size(), num_rows);
        num_rows -= batch_size;

        RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
            reader_properties_.use_threads(), static_cast<int>(readers.size()),
            [&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); }));

        for (const auto& column : columns) {
          if (column == nullptr || column->length() == 0) {
            return ::arrow::IterationTraits<RecordBatchIterator>::End();
          }
        }

//        auto table = ::arrow::Table::Make(batch_schema, std::move(columns));
//        auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
//
//        // NB: explicitly preserve table so that table_reader doesn't outlive it
//        return ::arrow::MakeFunctionIterator(
//            [table, table_reader] { return table_reader->Next(); });

        std::vector<std::shared_ptr<Array>> arrays;
        for (const auto& column : columns) {
            arrays.emplace_back(column->chunk(0));
        }
        return ::arrow::MakeVectorIterator<std::shared_ptr<::arrow::RecordBatch>>(
            {(::arrow::RecordBatch::Make(batch_schema, batch_size, std::move(arrays)))});
    });


  *out = std::make_unique<RowGroupRecordBatchReader>(
      ::arrow::MakeFlattenIterator(std::move(batches)), std::move(batch_schema));

  return Status::OK();
}

/// Given a file reader and a list of row groups, this is a generator of record
/// batch generators (where each sub-generator is the contents of a single row group).
class RowGroupGenerator {
 public:
  using RecordBatchGenerator =
      ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;

  struct ReadRequest {
    ::arrow::Future<RecordBatchGenerator> read;
    int64_t num_rows;
  };

  explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
                             ::arrow::internal::Executor* cpu_executor,
                             std::vector<int> row_groups, std::vector<int> column_indices,
                             int64_t min_rows_in_flight)
      : arrow_reader_(std::move(arrow_reader)),
        cpu_executor_(cpu_executor),
        row_groups_(std::move(row_groups)),
        column_indices_(std::move(column_indices)),
        min_rows_in_flight_(min_rows_in_flight),
        rows_in_flight_(0),
        index_(0),
        readahead_index_(0) {}

  ::arrow::Future<RecordBatchGenerator> operator()() {
    if (index_ >= row_groups_.size()) {
      return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>();
    }
    index_++;
    FillReadahead();
    ReadRequest next = std::move(in_flight_reads_.front());
    DCHECK(!in_flight_reads_.empty());
    in_flight_reads_.pop();
    rows_in_flight_ -= next.num_rows;
    return next.read;
  }

 private:
  void FillReadahead() {
    if (min_rows_in_flight_ == 0) {
      // No readahead, fetch the batch when it is asked for
      FetchNext();
    } else {
      while (readahead_index_ < row_groups_.size() &&
             rows_in_flight_ < min_rows_in_flight_) {
        FetchNext();
      }
    }
  }

  void FetchNext() {
    size_t row_group_index = readahead_index_++;
    int row_group = row_groups_[row_group_index];
    std::vector<int> column_indices = column_indices_;
    auto reader = arrow_reader_;
    int64_t num_rows =
        reader->parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
    rows_in_flight_ += num_rows;
    ::arrow::Future<RecordBatchGenerator> row_group_read;
    if (!reader->properties().pre_buffer()) {
      row_group_read = SubmitRead(cpu_executor_, reader, row_group, column_indices);
    } else {
      auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
      if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
      row_group_read =
          ready.Then([this, reader, row_group,
                      column_indices = std::move(
                          column_indices)]() -> ::arrow::Future<RecordBatchGenerator> {
            return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
          });
    }
    in_flight_reads_.push({std::move(row_group_read), num_rows});
  }

  // Synchronous fallback for when pre-buffer isn't enabled.
  //
  // Making the Parquet reader truly asynchronous requires heavy refactoring, so the
  // generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for
  // async I/O without forcing readahead.
  static ::arrow::Future<RecordBatchGenerator> SubmitRead(
      ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
      const int row_group, const std::vector<int>& column_indices) {
    if (!cpu_executor) {
      return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
    }
    // If we have an executor, then force transfer (even if I/O was complete)
    return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
                                                    row_group, column_indices));
  }

  static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup(
      ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
      const int row_group, const std::vector<int>& column_indices) {
    // Skips bound checks/pre-buffering, since we've done that already
    const int64_t batch_size = self->properties().batch_size();
    return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
        .Then([batch_size](const std::shared_ptr<Table>& table)
                  -> ::arrow::Result<RecordBatchGenerator> {
          ::arrow::TableBatchReader table_reader(*table);
          table_reader.set_chunksize(batch_size);
          ARROW_ASSIGN_OR_RAISE(auto batches, table_reader.ToRecordBatches());
          return ::arrow::MakeVectorGenerator(std::move(batches));
        });
  }

  std::shared_ptr<FileReaderImpl> arrow_reader_;
  ::arrow::internal::Executor* cpu_executor_;
  std::vector<int> row_groups_;
  std::vector<int> column_indices_;
  int64_t min_rows_in_flight_;
  std::queue<ReadRequest> in_flight_reads_;
  int64_t rows_in_flight_;
  size_t index_;
  size_t readahead_index_;
};

::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
                                        const std::vector<int> row_group_indices,
                                        const std::vector<int> column_indices,
                                        ::arrow::internal::Executor* cpu_executor,
                                        int64_t rows_to_readahead) {
  RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
  if (rows_to_readahead < 0) {
    return Status::Invalid("rows_to_readahead must be > 0");
  }
  if (reader_properties_.pre_buffer()) {
    BEGIN_PARQUET_CATCH_EXCEPTIONS
    reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(),
                       reader_properties_.cache_options());
    END_PARQUET_CATCH_EXCEPTIONS
  }
  ::arrow::AsyncGenerator<RowGroupGenerator::RecordBatchGenerator> row_group_generator =
      RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader),
                        cpu_executor, row_group_indices, column_indices,
                        rows_to_readahead);
  ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>> concatenated =
      ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator));
  WRAP_ASYNC_GENERATOR(std::move(concatenated));
  return concatenated;
}

Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
                                 std::unique_ptr<ColumnReader>* out) {
  RETURN_NOT_OK(BoundsCheckColumn(i));
  auto ctx = std::make_shared<ReaderContext>();
  ctx->reader = reader_.get();
  ctx->pool = pool_;
  ctx->iterator_factory = iterator_factory;
  ctx->filter_leaves = false;
  std::unique_ptr<ColumnReaderImpl> result;
  RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result));
  *out = std::move(result);
  return Status::OK();
}

Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
                                     const std::vector<int>& column_indices,
                                     std::shared_ptr<Table>* out) {
  RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

  // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
  if (reader_properties_.pre_buffer()) {
    BEGIN_PARQUET_CATCH_EXCEPTIONS
    parquet_reader()->PreBuffer(row_groups, column_indices,
                                reader_properties_.io_context(),
                                reader_properties_.cache_options());
    END_PARQUET_CATCH_EXCEPTIONS
  }

  auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
                             /*cpu_executor=*/nullptr);
  ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
  return Status::OK();
}

Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
    std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
    const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor) {
  // `self` is used solely to keep `this` alive in an async context - but we use this
  // in a sync context too so use `this` over `self`
  std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
  std::shared_ptr<::arrow::Schema> result_schema;
  RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
  // OptionalParallelForAsync requires an executor
  if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();

  auto read_column = [row_groups, self, this](size_t i,
                                              std::shared_ptr<ColumnReaderImpl> reader)
      -> ::arrow::Result<std::shared_ptr<::arrow::ChunkedArray>> {
    std::shared_ptr<::arrow::ChunkedArray> column;
    RETURN_NOT_OK(ReadColumn(static_cast<int>(i), row_groups, reader.get(), &column));
    return column;
  };
  auto make_table = [result_schema, row_groups, self,
                     this](const ::arrow::ChunkedArrayVector& columns)
      -> ::arrow::Result<std::shared_ptr<Table>> {
    int64_t num_rows = 0;
    if (!columns.empty()) {
      num_rows = columns[0]->length();
    } else {
      for (int i : row_groups) {
        num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
      }
    }
    auto table = Table::Make(std::move(result_schema), columns, num_rows);
    RETURN_NOT_OK(table->Validate());
    return table;
  };
  return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(),
                                                     std::move(readers), read_column,
                                                     cpu_executor)
      .Then(std::move(make_table));
}

std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
  return std::make_shared<RowGroupReaderImpl>(this, row_group_index);
}

// ----------------------------------------------------------------------
// Public factory functions

Status FileReader::GetRecordBatchReader(std::shared_ptr<RecordBatchReader>* out) {
  std::unique_ptr<RecordBatchReader> tmp;
  RETURN_NOT_OK(GetRecordBatchReader(&tmp));
  out->reset(tmp.release());
  return Status::OK();
}

Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
                                        std::shared_ptr<RecordBatchReader>* out) {
  std::unique_ptr<RecordBatchReader> tmp;
  RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, &tmp));
  out->reset(tmp.release());
  return Status::OK();
}

Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
                                        const std::vector<int>& column_indices,
                                        std::shared_ptr<RecordBatchReader>* out) {
  std::unique_ptr<RecordBatchReader> tmp;
  RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, column_indices, &tmp));
  out->reset(tmp.release());
  return Status::OK();
}

Status FileReader::Make(::arrow::MemoryPool* pool,
                        std::unique_ptr<ParquetFileReader> reader,
                        const ArrowReaderProperties& properties,
                        std::unique_ptr<FileReader>* out) {
  *out = std::make_unique<FileReaderImpl>(pool, std::move(reader), properties);
  return static_cast<FileReaderImpl*>(out->get())->Init();
}

Status FileReader::Make(::arrow::MemoryPool* pool,
                        std::unique_ptr<ParquetFileReader> reader,
                        std::unique_ptr<FileReader>* out) {
  return Make(pool, std::move(reader), default_arrow_reader_properties(), out);
}

FileReaderBuilder::FileReaderBuilder()
    : pool_(::arrow::default_memory_pool()),
      properties_(default_arrow_reader_properties()) {}

Status FileReaderBuilder::Open(std::shared_ptr<::arrow::io::RandomAccessFile> file,
                               const ReaderProperties& properties,
                               std::shared_ptr<FileMetaData> metadata) {
  PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::Open(std::move(file), properties,
                                                         std::move(metadata)));
  return Status::OK();
}

Status FileReaderBuilder::OpenFile(const std::string& path, bool memory_map,
                                   const ReaderProperties& properties,
                                   std::shared_ptr<FileMetaData> metadata) {
  PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::OpenFile(path, memory_map, properties,
                                                             std::move(metadata)));
  return Status::OK();
}

FileReaderBuilder* FileReaderBuilder::memory_pool(::arrow::MemoryPool* pool) {
  pool_ = pool;
  return this;
}

FileReaderBuilder* FileReaderBuilder::properties(
    const ArrowReaderProperties& arg_properties) {
  properties_ = arg_properties;
  return this;
}

Status FileReaderBuilder::Build(std::unique_ptr<FileReader>* out) {
  return FileReader::Make(pool_, std::move(raw_reader_), properties_, out);
}

Result<std::unique_ptr<FileReader>> FileReaderBuilder::Build() {
  std::unique_ptr<FileReader> out;
  RETURN_NOT_OK(FileReader::Make(pool_, std::move(raw_reader_), properties_, &out));
  return out;
}

Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool,
                std::unique_ptr<FileReader>* reader) {
  FileReaderBuilder builder;
  RETURN_NOT_OK(builder.Open(std::move(file)));
  return builder.memory_pool(pool)->Build(reader);
}

namespace internal {

Status FuzzReader(std::unique_ptr<FileReader> reader) {
  auto st = Status::OK();
  for (int i = 0; i < reader->num_row_groups(); ++i) {
    std::shared_ptr<Table> table;
    auto row_group_status = reader->ReadRowGroup(i, &table);
    if (row_group_status.ok()) {
      row_group_status &= table->ValidateFull();
    }
    st &= row_group_status;
  }
  return st;
}

Status FuzzReader(const uint8_t* data, int64_t size) {
  auto buffer = std::make_shared<::arrow::Buffer>(data, size);
  auto file = std::make_shared<::arrow::io::BufferReader>(buffer);
  FileReaderBuilder builder;
  RETURN_NOT_OK(builder.Open(std::move(file)));

  std::unique_ptr<FileReader> reader;
  RETURN_NOT_OK(builder.Build(&reader));
  return FuzzReader(std::move(reader));
}

}  // namespace internal

}  // namespace arrow
}  // namespace parquet
