// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "encoding.h"

#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <limits>
#include <memory>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

#include "arrow/array.h"
#include "arrow/array/builder_dict.h"
#include "arrow/stl_allocator.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_block_counter.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_stream_utils.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/byte_stream_split.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/hashing.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
#include "arrow/util/ubsan.h"
#include "arrow/visit_data_inline.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
#include "parquet/types.h"

namespace bit_util = arrow::bit_util;

using arrow::Status;
using arrow::VisitNullBitmapInline;
using arrow::internal::AddWithOverflow;
using arrow::internal::checked_cast;
using std::string_view;

template <typename T>
using ArrowPoolVector = std::vector<T, ::arrow::stl::allocator<T>>;

namespace ch_parquet {
using namespace parquet;
namespace {

constexpr int64_t kInMemoryDefaultCapacity = 1024;
// The Parquet spec isn't very clear whether ByteArray lengths are signed or
// unsigned, but the Java implementation uses signed ints.
constexpr size_t kMaxByteArraySize = std::numeric_limits<int32_t>::max();

class EncoderImpl : virtual public Encoder {
 public:
  EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool)
      : descr_(descr),
        encoding_(encoding),
        pool_(pool),
        type_length_(descr ? descr->type_length() : -1) {}

  Encoding::type encoding() const override { return encoding_; }

  MemoryPool* memory_pool() const override { return pool_; }

 protected:
  // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
  const ColumnDescriptor* descr_;
  const Encoding::type encoding_;
  MemoryPool* pool_;

  /// Type length from descr
  int type_length_;
};

// ----------------------------------------------------------------------
// Plain encoder implementation

template <typename DType>
class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
 public:
  using T = typename DType::c_type;

  explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
      : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}

  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }

  std::shared_ptr<Buffer> FlushValues() override {
    std::shared_ptr<Buffer> buffer;
    PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
    return buffer;
  }

  using TypedEncoder<DType>::Put;

  void Put(const T* buffer, int num_values) override;

  void Put(const ::arrow::Array& values) override;

  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
                 int64_t valid_bits_offset) override {
    if (valid_bits != NULLPTR) {
      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
                                                                   this->memory_pool()));
      T* data = reinterpret_cast<T*>(buffer->mutable_data());
      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
          src, num_values, valid_bits, valid_bits_offset, data);
      Put(data, num_valid_values);
    } else {
      Put(src, num_values);
    }
  }

  void UnsafePutByteArray(const void* data, uint32_t length) {
    DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
    sink_.UnsafeAppend(&length, sizeof(uint32_t));
    sink_.UnsafeAppend(data, static_cast<int64_t>(length));
  }

  void Put(const ByteArray& val) {
    // Write the result to the output stream
    const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t));
    if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) {
      PARQUET_THROW_NOT_OK(sink_.Reserve(increment));
    }
    UnsafePutByteArray(val.ptr, val.len);
  }

 protected:
  template <typename ArrayType>
  void PutBinaryArray(const ArrayType& array) {
    const int64_t total_bytes =
        array.value_offset(array.length()) - array.value_offset(0);
    PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t)));

    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
        *array.data(),
        [&](::std::string_view view) {
          if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
            return Status::Invalid("Parquet cannot store strings with size 2GB or more");
          }
          UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
          return Status::OK();
        },
        []() { return Status::OK(); }));
  }

  ::arrow::BufferBuilder sink_;
};

template <typename DType>
void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
  if (num_values > 0) {
    PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
  }
}

template <>
inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
  for (int i = 0; i < num_values; ++i) {
    Put(src[i]);
  }
}

template <typename ArrayType>
void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) {
  if (values.type_id() != ArrayType::TypeClass::type_id) {
    std::string type_name = ArrayType::TypeClass::type_name();
    throw ParquetException("direct put to " + type_name + " from " +
                           values.type()->ToString() + " not supported");
  }

  using value_type = typename ArrayType::value_type;
  constexpr auto value_size = sizeof(value_type);
  auto raw_values = checked_cast<const ArrayType&>(values).raw_values();

  if (values.null_count() == 0) {
    // no nulls, just dump the data
    PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size));
  } else {
    PARQUET_THROW_NOT_OK(
        sink->Reserve((values.length() - values.null_count()) * value_size));

    for (int64_t i = 0; i < values.length(); i++) {
      if (values.IsValid(i)) {
        sink->UnsafeAppend(&raw_values[i], value_size);
      }
    }
  }
}

template <>
void PlainEncoder<Int32Type>::Put(const ::arrow::Array& values) {
  DirectPutImpl<::arrow::Int32Array>(values, &sink_);
}

template <>
void PlainEncoder<Int64Type>::Put(const ::arrow::Array& values) {
  DirectPutImpl<::arrow::Int64Array>(values, &sink_);
}

template <>
void PlainEncoder<Int96Type>::Put(const ::arrow::Array& values) {
  ParquetException::NYI("direct put to Int96");
}

template <>
void PlainEncoder<FloatType>::Put(const ::arrow::Array& values) {
  DirectPutImpl<::arrow::FloatArray>(values, &sink_);
}

template <>
void PlainEncoder<DoubleType>::Put(const ::arrow::Array& values) {
  DirectPutImpl<::arrow::DoubleArray>(values, &sink_);
}

template <typename DType>
void PlainEncoder<DType>::Put(const ::arrow::Array& values) {
  ParquetException::NYI("direct put of " + values.type()->ToString());
}

void AssertBaseBinary(const ::arrow::Array& values) {
  if (!::arrow::is_base_binary_like(values.type_id())) {
    throw ParquetException("Only BaseBinaryArray and subclasses supported");
  }
}

template <>
inline void PlainEncoder<ByteArrayType>::Put(const ::arrow::Array& values) {
  AssertBaseBinary(values);

  if (::arrow::is_binary_like(values.type_id())) {
    PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
  } else {
    DCHECK(::arrow::is_large_binary_like(values.type_id()));
    PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
  }
}

void AssertFixedSizeBinary(const ::arrow::Array& values, int type_length) {
  if (values.type_id() != ::arrow::Type::FIXED_SIZE_BINARY &&
      values.type_id() != ::arrow::Type::DECIMAL) {
    throw ParquetException("Only FixedSizeBinaryArray and subclasses supported");
  }
  if (checked_cast<const ::arrow::FixedSizeBinaryType&>(*values.type()).byte_width() !=
      type_length) {
    throw ParquetException("Size mismatch: " + values.type()->ToString() +
                           " should have been " + std::to_string(type_length) + " wide");
  }
}

template <>
inline void PlainEncoder<FLBAType>::Put(const ::arrow::Array& values) {
  AssertFixedSizeBinary(values, descr_->type_length());
  const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);

  if (data.null_count() == 0) {
    // no nulls, just dump the data
    PARQUET_THROW_NOT_OK(
        sink_.Append(data.raw_values(), data.length() * data.byte_width()));
  } else {
    const int64_t total_bytes =
        data.length() * data.byte_width() - data.null_count() * data.byte_width();
    PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
    for (int64_t i = 0; i < data.length(); i++) {
      if (data.IsValid(i)) {
        sink_.UnsafeAppend(data.Value(i), data.byte_width());
      }
    }
  }
}

template <>
inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
  if (descr_->type_length() == 0) {
    return;
  }
  for (int i = 0; i < num_values; ++i) {
    // Write the result to the output stream
    DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
    PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
  }
}

template <>
class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEncoder {
 public:
  explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
      : EncoderImpl(descr, Encoding::PLAIN, pool),
        bits_available_(kInMemoryDefaultCapacity * 8),
        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
        sink_(pool),
        bit_writer_(bits_buffer_->mutable_data(),
                    static_cast<int>(bits_buffer_->size())) {}

  int64_t EstimatedDataEncodedSize() override;
  std::shared_ptr<Buffer> FlushValues() override;

  void Put(const bool* src, int num_values) override;

  void Put(const std::vector<bool>& src, int num_values) override;

  void PutSpaced(const bool* src, int num_values, const uint8_t* valid_bits,
                 int64_t valid_bits_offset) override {
    if (valid_bits != NULLPTR) {
      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
                                                                   this->memory_pool()));
      T* data = reinterpret_cast<T*>(buffer->mutable_data());
      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
          src, num_values, valid_bits, valid_bits_offset, data);
      Put(data, num_valid_values);
    } else {
      Put(src, num_values);
    }
  }

  void Put(const ::arrow::Array& values) override {
    if (values.type_id() != ::arrow::Type::BOOL) {
      throw ParquetException("direct put to boolean from " + values.type()->ToString() +
                             " not supported");
    }

    const auto& data = checked_cast<const ::arrow::BooleanArray&>(values);
    if (data.null_count() == 0) {
      PARQUET_THROW_NOT_OK(sink_.Reserve(bit_util::BytesForBits(data.length())));
      // no nulls, just dump the data
      ::arrow::internal::CopyBitmap(data.data()->GetValues<uint8_t>(1), data.offset(),
                                    data.length(), sink_.mutable_data(), sink_.length());
    } else {
      auto n_valid = bit_util::BytesForBits(data.length() - data.null_count());
      PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid));
      ::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(),
                                                      sink_.length(), n_valid);

      for (int64_t i = 0; i < data.length(); i++) {
        if (data.IsValid(i)) {
          if (data.Value(i)) {
            writer.Set();
          } else {
            writer.Clear();
          }
          writer.Next();
        }
      }
      writer.Finish();
    }
    sink_.UnsafeAdvance(data.length());
  }

 private:
  int bits_available_;
  std::shared_ptr<ResizableBuffer> bits_buffer_;
  ::arrow::BufferBuilder sink_;
  ::arrow::bit_util::BitWriter bit_writer_;

  template <typename SequenceType>
  void PutImpl(const SequenceType& src, int num_values);
};

template <typename SequenceType>
void PlainEncoder<BooleanType>::PutImpl(const SequenceType& src, int num_values) {
  int bit_offset = 0;
  if (bits_available_ > 0) {
    int bits_to_write = std::min(bits_available_, num_values);
    for (int i = 0; i < bits_to_write; i++) {
      bit_writer_.PutValue(src[i], 1);
    }
    bits_available_ -= bits_to_write;
    bit_offset = bits_to_write;

    if (bits_available_ == 0) {
      bit_writer_.Flush();
      PARQUET_THROW_NOT_OK(
          sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
      bit_writer_.Clear();
    }
  }

  int bits_remaining = num_values - bit_offset;
  while (bit_offset < num_values) {
    bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;

    int bits_to_write = std::min(bits_available_, bits_remaining);
    for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {
      bit_writer_.PutValue(src[i], 1);
    }
    bit_offset += bits_to_write;
    bits_available_ -= bits_to_write;
    bits_remaining -= bits_to_write;

    if (bits_available_ == 0) {
      bit_writer_.Flush();
      PARQUET_THROW_NOT_OK(
          sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
      bit_writer_.Clear();
    }
  }
}

int64_t PlainEncoder<BooleanType>::EstimatedDataEncodedSize() {
  int64_t position = sink_.length();
  return position + bit_writer_.bytes_written();
}

std::shared_ptr<Buffer> PlainEncoder<BooleanType>::FlushValues() {
  if (bits_available_ > 0) {
    bit_writer_.Flush();
    PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
    bit_writer_.Clear();
    bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
  }

  std::shared_ptr<Buffer> buffer;
  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
  return buffer;
}

void PlainEncoder<BooleanType>::Put(const bool* src, int num_values) {
  PutImpl(src, num_values);
}

void PlainEncoder<BooleanType>::Put(const std::vector<bool>& src, int num_values) {
  PutImpl(src, num_values);
}

// ----------------------------------------------------------------------
// DictEncoder<T> implementations

template <typename DType>
struct DictEncoderTraits {
  using c_type = typename DType::c_type;
  using MemoTableType = ::arrow::internal::ScalarMemoTable<c_type>;
};

template <>
struct DictEncoderTraits<ByteArrayType> {
  using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>;
};

template <>
struct DictEncoderTraits<FLBAType> {
  using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>;
};

// Initially 1024 elements
static constexpr int32_t kInitialHashTableSize = 1 << 10;

/// See the dictionary encoding section of
/// https://github.com/Parquet/parquet-format.  The encoding supports
/// streaming encoding. Values are encoded as they are added while the
/// dictionary is being constructed. At any time, the buffered values
/// can be written out with the current dictionary size. More values
/// can then be added to the encoder, including new dictionary
/// entries.
template <typename DType>
class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
  using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType;

 public:
  typedef typename DType::c_type T;

  explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool)
      : EncoderImpl(desc, Encoding::PLAIN_DICTIONARY, pool),
        buffered_indices_(::arrow::stl::allocator<int32_t>(pool)),
        dict_encoded_size_(0),
        memo_table_(pool, kInitialHashTableSize) {}

  ~DictEncoderImpl() override { DCHECK(buffered_indices_.empty()); }

  int dict_encoded_size() override { return dict_encoded_size_; }

  int WriteIndices(uint8_t* buffer, int buffer_len) override {
    // Write bit width in first byte
    *buffer = static_cast<uint8_t>(bit_width());
    ++buffer;
    --buffer_len;

    ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());

    for (int32_t index : buffered_indices_) {
      if (!encoder.Put(index)) return -1;
    }
    encoder.Flush();

    ClearIndices();
    return 1 + encoder.len();
  }

  void set_type_length(int type_length) { this->type_length_ = type_length; }

  /// Returns a conservative estimate of the number of bytes needed to encode the buffered
  /// indices. Used to size the buffer passed to WriteIndices().
  int64_t EstimatedDataEncodedSize() override {
    // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
    // reserve
    // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
    // but not reserving them would cause the encoder to fail.
    return 1 +
           ::arrow::util::RleEncoder::MaxBufferSize(
               bit_width(), static_cast<int>(buffered_indices_.size())) +
           ::arrow::util::RleEncoder::MinBufferSize(bit_width());
  }

  /// The minimum bit width required to encode the currently buffered indices.
  int bit_width() const override {
    if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0;
    if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1;
    return bit_util::Log2(num_entries());
  }

  /// Encode value. Note that this does not actually write any data, just
  /// buffers the value's index to be written later.
  inline void Put(const T& value);

  // Not implemented for other data types
  inline void PutByteArray(const void* ptr, int32_t length);

  void Put(const T* src, int num_values) override {
    for (int32_t i = 0; i < num_values; i++) {
      Put(src[i]);
    }
  }

  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
                 int64_t valid_bits_offset) override {
    ::arrow::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, num_values,
                                           [&](int64_t position, int64_t length) {
                                             for (int64_t i = 0; i < length; i++) {
                                               Put(src[i + position]);
                                             }
                                           });
  }

  using TypedEncoder<DType>::Put;

  void Put(const ::arrow::Array& values) override;
  void PutDictionary(const ::arrow::Array& values) override;

  template <typename ArrowType, typename T = typename ArrowType::c_type>
  void PutIndicesTyped(const ::arrow::Array& data) {
    auto values = data.data()->GetValues<T>(1);
    size_t buffer_position = buffered_indices_.size();
    buffered_indices_.resize(buffer_position +
                             static_cast<size_t>(data.length() - data.null_count()));
    ::arrow::internal::VisitSetBitRunsVoid(
        data.null_bitmap_data(), data.offset(), data.length(),
        [&](int64_t position, int64_t length) {
          for (int64_t i = 0; i < length; ++i) {
            buffered_indices_[buffer_position++] =
                static_cast<int32_t>(values[i + position]);
          }
        });
  }

  void PutIndices(const ::arrow::Array& data) override {
    switch (data.type()->id()) {
      case ::arrow::Type::UINT8:
      case ::arrow::Type::INT8:
        return PutIndicesTyped<::arrow::UInt8Type>(data);
      case ::arrow::Type::UINT16:
      case ::arrow::Type::INT16:
        return PutIndicesTyped<::arrow::UInt16Type>(data);
      case ::arrow::Type::UINT32:
      case ::arrow::Type::INT32:
        return PutIndicesTyped<::arrow::UInt32Type>(data);
      case ::arrow::Type::UINT64:
      case ::arrow::Type::INT64:
        return PutIndicesTyped<::arrow::UInt64Type>(data);
      default:
        throw ParquetException("Passed non-integer array to PutIndices");
    }
  }

  std::shared_ptr<Buffer> FlushValues() override {
    std::shared_ptr<ResizableBuffer> buffer =
        AllocateBuffer(this->pool_, EstimatedDataEncodedSize());
    int result_size = WriteIndices(buffer->mutable_data(),
                                   static_cast<int>(EstimatedDataEncodedSize()));
    PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
    return std::move(buffer);
  }

  /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
  /// dict_encoded_size() bytes.
  void WriteDict(uint8_t* buffer) override;

  /// The number of entries in the dictionary.
  int num_entries() const override { return memo_table_.size(); }

 private:
  /// Clears all the indices (but leaves the dictionary).
  void ClearIndices() { buffered_indices_.clear(); }

  /// Indices that have not yet be written out by WriteIndices().
  ArrowPoolVector<int32_t> buffered_indices_;

  template <typename ArrayType>
  void PutBinaryArray(const ArrayType& array) {
    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
        *array.data(),
        [&](::std::string_view view) {
          if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
            return Status::Invalid("Parquet cannot store strings with size 2GB or more");
          }
          PutByteArray(view.data(), static_cast<uint32_t>(view.size()));
          return Status::OK();
        },
        []() { return Status::OK(); }));
  }

  template <typename ArrayType>
  void PutBinaryDictionaryArray(const ArrayType& array) {
    DCHECK_EQ(array.null_count(), 0);
    for (int64_t i = 0; i < array.length(); i++) {
      auto v = array.GetView(i);
      if (ARROW_PREDICT_FALSE(v.size() > kMaxByteArraySize)) {
        throw ParquetException("Parquet cannot store strings with size 2GB or more");
      }
      dict_encoded_size_ += static_cast<int>(v.size() + sizeof(uint32_t));
      int32_t unused_memo_index;
      PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(
          v.data(), static_cast<int32_t>(v.size()), &unused_memo_index));
    }
  }

  /// The number of bytes needed to encode the dictionary.
  int dict_encoded_size_;

  MemoTableType memo_table_;
};

template <typename DType>
void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) {
  // For primitive types, only a memcpy
  DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
  memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
}

// ByteArray and FLBA already have the dictionary encoded in their data heaps
template <>
void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) {
  memo_table_.VisitValues(0, [&buffer](const ::std::string_view& v) {
    uint32_t len = static_cast<uint32_t>(v.length());
    memcpy(buffer, &len, sizeof(len));
    buffer += sizeof(len);
    memcpy(buffer, v.data(), len);
    buffer += len;
  });
}

template <>
void DictEncoderImpl<FLBAType>::WriteDict(uint8_t* buffer) {
  memo_table_.VisitValues(0, [&](const ::std::string_view& v) {
    DCHECK_EQ(v.length(), static_cast<size_t>(type_length_));
    memcpy(buffer, v.data(), type_length_);
    buffer += type_length_;
  });
}

template <typename DType>
inline void DictEncoderImpl<DType>::Put(const T& v) {
  // Put() implementation for primitive types
  auto on_found = [](int32_t memo_index) {};
  auto on_not_found = [this](int32_t memo_index) {
    dict_encoded_size_ += static_cast<int>(sizeof(T));
  };

  int32_t memo_index;
  PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(v, on_found, on_not_found, &memo_index));
  buffered_indices_.push_back(memo_index);
}

template <typename DType>
inline void DictEncoderImpl<DType>::PutByteArray(const void* ptr, int32_t length) {
  DCHECK(false);
}

template <>
inline void DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
                                                         int32_t length) {
  static const uint8_t empty[] = {0};

  auto on_found = [](int32_t memo_index) {};
  auto on_not_found = [&](int32_t memo_index) {
    dict_encoded_size_ += static_cast<int>(length + sizeof(uint32_t));
  };

  DCHECK(ptr != nullptr || length == 0);
  ptr = (ptr != nullptr) ? ptr : empty;
  int32_t memo_index;
  PARQUET_THROW_NOT_OK(
      memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, &memo_index));
  buffered_indices_.push_back(memo_index);
}

template <>
inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& val) {
  return PutByteArray(val.ptr, static_cast<int32_t>(val.len));
}

template <>
inline void DictEncoderImpl<FLBAType>::Put(const FixedLenByteArray& v) {
  static const uint8_t empty[] = {0};

  auto on_found = [](int32_t memo_index) {};
  auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; };

  DCHECK(v.ptr != nullptr || type_length_ == 0);
  const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
  int32_t memo_index;
  PARQUET_THROW_NOT_OK(
      memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found, &memo_index));
  buffered_indices_.push_back(memo_index);
}

template <>
void DictEncoderImpl<Int96Type>::Put(const ::arrow::Array& values) {
  ParquetException::NYI("Direct put to Int96");
}

template <>
void DictEncoderImpl<Int96Type>::PutDictionary(const ::arrow::Array& values) {
  ParquetException::NYI("Direct put to Int96");
}

template <typename DType>
void DictEncoderImpl<DType>::Put(const ::arrow::Array& values) {
  using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType;
  const auto& data = checked_cast<const ArrayType&>(values);
  if (data.null_count() == 0) {
    // no nulls, just dump the data
    for (int64_t i = 0; i < data.length(); i++) {
      Put(data.Value(i));
    }
  } else {
    for (int64_t i = 0; i < data.length(); i++) {
      if (data.IsValid(i)) {
        Put(data.Value(i));
      }
    }
  }
}

template <>
void DictEncoderImpl<FLBAType>::Put(const ::arrow::Array& values) {
  AssertFixedSizeBinary(values, type_length_);
  const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
  if (data.null_count() == 0) {
    // no nulls, just dump the data
    for (int64_t i = 0; i < data.length(); i++) {
      Put(FixedLenByteArray(data.Value(i)));
    }
  } else {
    std::vector<uint8_t> empty(type_length_, 0);
    for (int64_t i = 0; i < data.length(); i++) {
      if (data.IsValid(i)) {
        Put(FixedLenByteArray(data.Value(i)));
      }
    }
  }
}

template <>
void DictEncoderImpl<ByteArrayType>::Put(const ::arrow::Array& values) {
  AssertBaseBinary(values);
  if (::arrow::is_binary_like(values.type_id())) {
    PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
  } else {
    DCHECK(::arrow::is_large_binary_like(values.type_id()));
    PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
  }
}

template <typename DType>
void AssertCanPutDictionary(DictEncoderImpl<DType>* encoder, const ::arrow::Array& dict) {
  if (dict.null_count() > 0) {
    throw ParquetException("Inserted dictionary cannot cannot contain nulls");
  }

  if (encoder->num_entries() > 0) {
    throw ParquetException("Can only call PutDictionary on an empty DictEncoder");
  }
}

template <typename DType>
void DictEncoderImpl<DType>::PutDictionary(const ::arrow::Array& values) {
  AssertCanPutDictionary(this, values);

  using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType;
  const auto& data = checked_cast<const ArrayType&>(values);

  dict_encoded_size_ += static_cast<int>(sizeof(typename DType::c_type) * data.length());
  for (int64_t i = 0; i < data.length(); i++) {
    int32_t unused_memo_index;
    PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(data.Value(i), &unused_memo_index));
  }
}

template <>
void DictEncoderImpl<FLBAType>::PutDictionary(const ::arrow::Array& values) {
  AssertFixedSizeBinary(values, type_length_);
  AssertCanPutDictionary(this, values);

  const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);

  dict_encoded_size_ += static_cast<int>(type_length_ * data.length());
  for (int64_t i = 0; i < data.length(); i++) {
    int32_t unused_memo_index;
    PARQUET_THROW_NOT_OK(
        memo_table_.GetOrInsert(data.Value(i), type_length_, &unused_memo_index));
  }
}

template <>
void DictEncoderImpl<ByteArrayType>::PutDictionary(const ::arrow::Array& values) {
  AssertBaseBinary(values);
  AssertCanPutDictionary(this, values);

  if (::arrow::is_binary_like(values.type_id())) {
    PutBinaryDictionaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
  } else {
    DCHECK(::arrow::is_large_binary_like(values.type_id()));
    PutBinaryDictionaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
  }
}

// ----------------------------------------------------------------------
// ByteStreamSplitEncoder<T> implementations

template <typename DType>
class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
 public:
  using T = typename DType::c_type;
  using TypedEncoder<DType>::Put;

  explicit ByteStreamSplitEncoder(
      const ColumnDescriptor* descr,
      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

  int64_t EstimatedDataEncodedSize() override;
  std::shared_ptr<Buffer> FlushValues() override;

  void Put(const T* buffer, int num_values) override;
  void Put(const ::arrow::Array& values) override;
  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
                 int64_t valid_bits_offset) override;

 protected:
  template <typename ArrowType>
  void PutImpl(const ::arrow::Array& values) {
    if (values.type_id() != ArrowType::type_id) {
      throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() +
                             " from " + values.type()->ToString() + " not supported");
    }
    const auto& data = *values.data();
    PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
              static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), data.offset);
  }

  ::arrow::BufferBuilder sink_;
  int64_t num_values_in_buffer_;
};

template <typename DType>
ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* descr,
                                                      ::arrow::MemoryPool* pool)
    : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool),
      sink_{pool},
      num_values_in_buffer_{0} {}

template <typename DType>
int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
  return sink_.length();
}

template <typename DType>
std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
  std::shared_ptr<ResizableBuffer> output_buffer =
      AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
  uint8_t* output_buffer_raw = output_buffer->mutable_data();
  const uint8_t* raw_values = sink_.data();
  ::arrow::util::internal::ByteStreamSplitEncode<T>(raw_values, num_values_in_buffer_,
                                                    output_buffer_raw);
  sink_.Reset();
  num_values_in_buffer_ = 0;
  return std::move(output_buffer);
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
  if (num_values > 0) {
    PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
    num_values_in_buffer_ += num_values;
  }
}

template <>
void ByteStreamSplitEncoder<FloatType>::Put(const ::arrow::Array& values) {
  PutImpl<::arrow::FloatType>(values);
}

template <>
void ByteStreamSplitEncoder<DoubleType>::Put(const ::arrow::Array& values) {
  PutImpl<::arrow::DoubleType>(values);
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
                                              const uint8_t* valid_bits,
                                              int64_t valid_bits_offset) {
  if (valid_bits != NULLPTR) {
    PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
                                                                 this->memory_pool()));
    T* data = reinterpret_cast<T*>(buffer->mutable_data());
    int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
        src, num_values, valid_bits, valid_bits_offset, data);
    Put(data, num_valid_values);
  } else {
    Put(src, num_values);
  }
}

class DecoderImpl : virtual public Decoder {
 public:
  void SetData(int num_values, const uint8_t* data, int len) override {
    num_values_ = num_values;
    data_ = data;
    len_ = len;
  }

  int values_left() const override { return num_values_; }
  Encoding::type encoding() const override { return encoding_; }

 protected:
  explicit DecoderImpl(const ColumnDescriptor* descr, Encoding::type encoding)
      : descr_(descr), encoding_(encoding), num_values_(0), data_(NULLPTR), len_(0) {}

  // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
  const ColumnDescriptor* descr_;

  const Encoding::type encoding_;
  int num_values_;
  const uint8_t* data_;
  int len_;
  int type_length_;
};

template <typename DType>
class PlainDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
 public:
  using T = typename DType::c_type;
  explicit PlainDecoder(const ColumnDescriptor* descr);

  int Decode(T* buffer, int max_values) override;

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<DType>::Accumulator* builder) override;

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<DType>::DictAccumulator* builder) override;
};

template <>
inline int PlainDecoder<Int96Type>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<Int96Type>::Accumulator* builder) {
  ParquetException::NYI("DecodeArrow not supported for Int96");
}

template <>
inline int PlainDecoder<Int96Type>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<Int96Type>::DictAccumulator* builder) {
  ParquetException::NYI("DecodeArrow not supported for Int96");
}

template <>
inline int PlainDecoder<BooleanType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
  ParquetException::NYI("dictionaries of BooleanType");
}

template <typename DType>
int PlainDecoder<DType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<DType>::Accumulator* builder) {
  using value_type = typename DType::c_type;

  constexpr int value_size = static_cast<int>(sizeof(value_type));
  int values_decoded = num_values - null_count;
  if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
    ParquetException::EofException();
  }

  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        builder->UnsafeAppend(::arrow::util::SafeLoadAs<value_type>(data_));
        data_ += sizeof(value_type);
      },
      [&]() { builder->UnsafeAppendNull(); });

  num_values_ -= values_decoded;
  len_ -= sizeof(value_type) * values_decoded;
  return values_decoded;
}

template <typename DType>
int PlainDecoder<DType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<DType>::DictAccumulator* builder) {
  using value_type = typename DType::c_type;

  constexpr int value_size = static_cast<int>(sizeof(value_type));
  int values_decoded = num_values - null_count;
  if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
    ParquetException::EofException();
  }

  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        PARQUET_THROW_NOT_OK(
            builder->Append(::arrow::util::SafeLoadAs<value_type>(data_)));
        data_ += sizeof(value_type);
      },
      [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });

  num_values_ -= values_decoded;
  len_ -= sizeof(value_type) * values_decoded;
  return values_decoded;
}

// Decode routine templated on C++ type rather than type enum
template <typename T>
inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
                       int type_length, T* out) {
  int64_t bytes_to_decode = num_values * static_cast<int64_t>(sizeof(T));
  if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) {
    ParquetException::EofException();
  }
  // If bytes_to_decode == 0, data could be null
  if (bytes_to_decode > 0) {
    memcpy(out, data, bytes_to_decode);
  }
  return static_cast<int>(bytes_to_decode);
}

template <typename DType>
PlainDecoder<DType>::PlainDecoder(const ColumnDescriptor* descr)
    : DecoderImpl(descr, Encoding::PLAIN) {
  if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
    type_length_ = descr_->type_length();
  } else {
    type_length_ = -1;
  }
}

// Template specialization for BYTE_ARRAY. The written values do not own their
// own data.

static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size,
                                    ByteArray* out) {
  if (ARROW_PREDICT_FALSE(data_size < 4)) {
    ParquetException::EofException();
  }
  const int32_t len = ::arrow::util::SafeLoadAs<int32_t>(data);
  if (len < 0) {
    throw ParquetException("Invalid BYTE_ARRAY value");
  }
  const int64_t consumed_length = static_cast<int64_t>(len) + 4;
  if (ARROW_PREDICT_FALSE(data_size < consumed_length)) {
    ParquetException::EofException();
  }
  *out = ByteArray{static_cast<uint32_t>(len), data + 4};
  return consumed_length;
}

template <>
inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values,
                                  int type_length, ByteArray* out) {
  int bytes_decoded = 0;
  for (int i = 0; i < num_values; ++i) {
    const auto increment = ReadByteArray(data, data_size, out + i);
    if (ARROW_PREDICT_FALSE(increment > INT_MAX - bytes_decoded)) {
      throw ParquetException("BYTE_ARRAY chunk too large");
    }
    data += increment;
    data_size -= increment;
    bytes_decoded += static_cast<int>(increment);
  }
  return bytes_decoded;
}

// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
// own their own data.
template <>
inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size,
                                          int num_values, int type_length,
                                          FixedLenByteArray* out) {
  int64_t bytes_to_decode = static_cast<int64_t>(type_length) * num_values;
  if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) {
    ParquetException::EofException();
  }
  for (int i = 0; i < num_values; ++i) {
    out[i].ptr = data;
    data += type_length;
    data_size -= type_length;
  }
  return static_cast<int>(bytes_to_decode);
}

template <typename DType>
int PlainDecoder<DType>::Decode(T* buffer, int max_values) {
  max_values = std::min(max_values, num_values_);
  int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer);
  data_ += bytes_consumed;
  len_ -= bytes_consumed;
  num_values_ -= max_values;
  return max_values;
}

class PlainBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
 public:
  explicit PlainBooleanDecoder(const ColumnDescriptor* descr);
  void SetData(int num_values, const uint8_t* data, int len) override;

  // Two flavors of bool decoding
  int Decode(uint8_t* buffer, int max_values) override;
  int Decode(bool* buffer, int max_values) override;
  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<BooleanType>::Accumulator* out) override;

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<BooleanType>::DictAccumulator* out) override;

 private:
  std::unique_ptr<::arrow::bit_util::BitReader> bit_reader_;
};

PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr)
    : DecoderImpl(descr, Encoding::PLAIN) {}

void PlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) {
  num_values_ = num_values;
  bit_reader_ = std::make_unique<bit_util::BitReader>(data, len);
}

int PlainBooleanDecoder::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<BooleanType>::Accumulator* builder) {
  int values_decoded = num_values - null_count;
  if (ARROW_PREDICT_FALSE(num_values_ < values_decoded)) {
    ParquetException::EofException();
  }

  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        bool value;
        ARROW_IGNORE_EXPR(bit_reader_->GetValue(1, &value));
        builder->UnsafeAppend(value);
      },
      [&]() { builder->UnsafeAppendNull(); });

  num_values_ -= values_decoded;
  return values_decoded;
}

inline int PlainBooleanDecoder::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
  ParquetException::NYI("dictionaries of BooleanType");
}

int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) {
  max_values = std::min(max_values, num_values_);
  bool val;
  ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values);
  for (int i = 0; i < max_values; ++i) {
    if (!bit_reader_->GetValue(1, &val)) {
      ParquetException::EofException();
    }
    if (val) {
      bit_writer.Set();
    }
    bit_writer.Next();
  }
  bit_writer.Finish();
  num_values_ -= max_values;
  return max_values;
}

int PlainBooleanDecoder::Decode(bool* buffer, int max_values) {
  max_values = std::min(max_values, num_values_);
  if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) {
    ParquetException::EofException();
  }
  num_values_ -= max_values;
  return max_values;
}

struct ArrowBinaryHelper {
  explicit ArrowBinaryHelper(typename EncodingTraits<ByteArrayType>::Accumulator* out) {
    this->out = out;
    this->builder = out->builder.get();
    this->chunk_space_remaining =
        ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
  }

  Status PushChunk() {
    std::shared_ptr<::arrow::Array> result;
    RETURN_NOT_OK(builder->Finish(&result));
    out->chunks.push_back(result);
    chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
    return Status::OK();
  }

  bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }

  void UnsafeAppend(const uint8_t* data, int32_t length) {
    chunk_space_remaining -= length;
    builder->UnsafeAppend(data, length);
  }

  void UnsafeAppendNull() { builder->UnsafeAppendNull(); }

  Status Append(const uint8_t* data, int32_t length) {
    chunk_space_remaining -= length;
    return builder->Append(data, length);
  }

  Status AppendNull() { return builder->AppendNull(); }

  typename EncodingTraits<ByteArrayType>::Accumulator* out;
  ::arrow::BinaryBuilder* builder;
  int64_t chunk_space_remaining;
};

template <>
inline int PlainDecoder<ByteArrayType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<ByteArrayType>::Accumulator* builder) {
  ParquetException::NYI();
}

template <>
inline int PlainDecoder<ByteArrayType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) {
  ParquetException::NYI();
}

template <>
inline int PlainDecoder<FLBAType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<FLBAType>::Accumulator* builder) {
  int values_decoded = num_values - null_count;
  if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) {
    ParquetException::EofException();
  }

  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        builder->UnsafeAppend(data_);
        data_ += descr_->type_length();
      },
      [&]() { builder->UnsafeAppendNull(); });

  num_values_ -= values_decoded;
  len_ -= descr_->type_length() * values_decoded;
  return values_decoded;
}

template <>
inline int PlainDecoder<FLBAType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<FLBAType>::DictAccumulator* builder) {
  int values_decoded = num_values - null_count;
  if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) {
    ParquetException::EofException();
  }

  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        PARQUET_THROW_NOT_OK(builder->Append(data_));
        data_ += descr_->type_length();
      },
      [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });

  num_values_ -= values_decoded;
  len_ -= descr_->type_length() * values_decoded;
  return values_decoded;
}

class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
                              virtual public ByteArrayDecoder {
 public:
  using Base = PlainDecoder<ByteArrayType>;
  using Base::DecodeSpaced;
  using Base::PlainDecoder;

  // ----------------------------------------------------------------------
  // Dictionary read paths

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  ::arrow::BinaryDictionary32Builder* builder) override {
    int result = 0;
    PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
                                     valid_bits_offset, builder, &result));
    return result;
  }

  // ----------------------------------------------------------------------
  // Optimized dense binary read paths

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
    int result = 0;
    PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
                                          valid_bits_offset, out, &result));
    return result;
  }

  int DecodeCH(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
               PaddedPODArray<UInt8>* column_chars_t_p,
               PaddedPODArray<UInt64>* column_offsets_p) {
      int result = 0;
      PARQUET_THROW_NOT_OK(DecodeCHDense(num_values, null_count, valid_bits,
                                            valid_bits_offset, column_chars_t_p,
                                            column_offsets_p, &result));
      return result;
  }


 private:

     Status DecodeCHDense(int num_values, int null_count, const uint8_t* valid_bits,
                             int64_t valid_bits_offset,
                          PaddedPODArray<UInt8>* column_chars_t_p,
                          PaddedPODArray<UInt64>* column_offsets_p,
                             int* out_values_decoded) {
         //ArrowBinaryHelper helper(out);
         int values_decoded = 0;

//         RETURN_NOT_OK(helper.builder->Reserve(num_values));
//         RETURN_NOT_OK(helper.builder->ReserveData(
//             std::min<int64_t>(len_, helper.chunk_space_remaining)));
         column_offsets_p->reserve(num_values);
         column_chars_t_p->reserve(num_values + len_);

         if (null_count == 0) {
             for (int i = 0 ; i < num_values; i++) {
                 if (ARROW_PREDICT_FALSE(len_ < 4))
                 {
                     ParquetException::EofException();
                 }
                 auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_);
                 if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4))
                 {
                     return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
                 }
                 auto increment = value_len + 4;
                 if (ARROW_PREDICT_FALSE(len_ < increment))
                 {
                     ParquetException::EofException();
                 }

                 column_chars_t_p->insert_assume_reserved(data_ + 4, data_ + 4 + value_len);
                 column_chars_t_p->emplace_back('\0');
                 column_offsets_p->emplace_back(column_chars_t_p->size());

                 data_ += increment;
                 len_ -= increment;
                 ++values_decoded;
             }
         } else {
             RETURN_NOT_OK(VisitNullBitmapInline(
                 valid_bits,
                 valid_bits_offset,
                 num_values,
                 null_count,
                 [&]()
                 {
                     if (ARROW_PREDICT_FALSE(len_ < 4))
                     {
                         ParquetException::EofException();
                     }
                     auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_);
                     if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4))
                     {
                         return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
                     }
                     auto increment = value_len + 4;
                     if (ARROW_PREDICT_FALSE(len_ < increment))
                     {
                         ParquetException::EofException();
                     }

                     column_chars_t_p->insert_assume_reserved(data_ + 4, data_ + 4 + value_len);
                     column_chars_t_p->emplace_back('\0');
                     column_offsets_p->emplace_back(column_chars_t_p->size());

                     data_ += increment;
                     len_ -= increment;
                     ++values_decoded;
                     return Status::OK();
                 },
                 [&]()
                 {
                     //helper.UnsafeAppendNull();
                     column_chars_t_p->emplace_back('\0');
                     column_offsets_p->emplace_back(column_chars_t_p->size());
                     return Status::OK();
                 }));
         }

         num_values_ -= values_decoded;
         *out_values_decoded = values_decoded;
         return Status::OK();
     }

  Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
                          int64_t valid_bits_offset,
                          typename EncodingTraits<ByteArrayType>::Accumulator* out,
                          int* out_values_decoded) {
    ArrowBinaryHelper helper(out);
    int values_decoded = 0;

    RETURN_NOT_OK(helper.builder->Reserve(num_values));
    RETURN_NOT_OK(helper.builder->ReserveData(
        std::min<int64_t>(len_, helper.chunk_space_remaining)));

    int i = 0;
    RETURN_NOT_OK(VisitNullBitmapInline(
        valid_bits, valid_bits_offset, num_values, null_count,
        [&]() {
          if (ARROW_PREDICT_FALSE(len_ < 4)) {
            ParquetException::EofException();
          }
          auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_);
          if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) {
            return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
          }
          auto increment = value_len + 4;
          if (ARROW_PREDICT_FALSE(len_ < increment)) {
            ParquetException::EofException();
          }
          if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
            // This element would exceed the capacity of a chunk
            RETURN_NOT_OK(helper.PushChunk());
            RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
            RETURN_NOT_OK(helper.builder->ReserveData(
                std::min<int64_t>(len_, helper.chunk_space_remaining)));
          }
          helper.UnsafeAppend(data_ + 4, value_len);
          data_ += increment;
          len_ -= increment;
          ++values_decoded;
          ++i;
          return Status::OK();
        },
        [&]() {
          helper.UnsafeAppendNull();
          ++i;
          return Status::OK();
        }));

    num_values_ -= values_decoded;
    *out_values_decoded = values_decoded;
    return Status::OK();
  }

  template <typename BuilderType>
  Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                     int64_t valid_bits_offset, BuilderType* builder,
                     int* out_values_decoded) {
    RETURN_NOT_OK(builder->Reserve(num_values));
    int values_decoded = 0;

    RETURN_NOT_OK(VisitNullBitmapInline(
        valid_bits, valid_bits_offset, num_values, null_count,
        [&]() {
          if (ARROW_PREDICT_FALSE(len_ < 4)) {
            ParquetException::EofException();
          }
          auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_);
          if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) {
            return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
          }
          auto increment = value_len + 4;
          if (ARROW_PREDICT_FALSE(len_ < increment)) {
            ParquetException::EofException();
          }
          RETURN_NOT_OK(builder->Append(data_ + 4, value_len));
          data_ += increment;
          len_ -= increment;
          ++values_decoded;
          return Status::OK();
        },
        [&]() { return builder->AppendNull(); }));

    num_values_ -= values_decoded;
    *out_values_decoded = values_decoded;
    return Status::OK();
  }
};

class PlainFLBADecoder : public PlainDecoder<FLBAType>, virtual public FLBADecoder {
 public:
  using Base = PlainDecoder<FLBAType>;
  using Base::PlainDecoder;
};

// ----------------------------------------------------------------------
// Dictionary encoding and decoding

template <typename Type>
class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
 public:
  typedef typename Type::c_type T;

  // Initializes the dictionary with values from 'dictionary'. The data in
  // dictionary is not guaranteed to persist in memory after this call so the
  // dictionary decoder needs to copy the data out if necessary.
  explicit DictDecoderImpl(const ColumnDescriptor* descr,
                           MemoryPool* pool = ::arrow::default_memory_pool())
      : DecoderImpl(descr, Encoding::RLE_DICTIONARY),
        dictionary_(AllocateBuffer(pool, 0)),
        dictionary_length_(0),
        byte_array_data_(AllocateBuffer(pool, 0)),
        byte_array_offsets_(AllocateBuffer(pool, 0)),
        indices_scratch_space_(AllocateBuffer(pool, 0)) {}

  // Perform type-specific initiatialization
  void SetDict(TypedDecoder<Type>* dictionary) override;

  void SetData(int num_values, const uint8_t* data, int len) override {
    num_values_ = num_values;
    if (len == 0) {
      // Initialize dummy decoder to avoid crashes later on
      idx_decoder_ = ::arrow::util::RleDecoder(data, len, /*bit_width=*/1);
      return;
    }
    uint8_t bit_width = *data;
    if (ARROW_PREDICT_FALSE(bit_width > 32)) {
      throw ParquetException("Invalid or corrupted bit_width " +
                             std::to_string(bit_width) + ". Maximum allowed is 32.");
    }
    idx_decoder_ = ::arrow::util::RleDecoder(++data, --len, bit_width);
  }

  int Decode(T* buffer, int num_values) override {
    num_values = std::min(num_values, num_values_);
    int decoded_values =
        idx_decoder_.GetBatchWithDict(reinterpret_cast<const T*>(dictionary_->data()),
                                      dictionary_length_, buffer, num_values);
    if (decoded_values != num_values) {
      ParquetException::EofException();
    }
    num_values_ -= num_values;
    return num_values;
  }

  int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
                   int64_t valid_bits_offset) override {
    num_values = std::min(num_values, num_values_);
    if (num_values != idx_decoder_.GetBatchWithDictSpaced(
                          reinterpret_cast<const T*>(dictionary_->data()),
                          dictionary_length_, buffer, num_values, null_count, valid_bits,
                          valid_bits_offset)) {
      ParquetException::EofException();
    }
    num_values_ -= num_values;
    return num_values;
  }

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<Type>::Accumulator* out) override;

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<Type>::DictAccumulator* out) override;

  void InsertDictionary(::arrow::ArrayBuilder* builder) override;

  int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits,
                          int64_t valid_bits_offset,
                          ::arrow::ArrayBuilder* builder) override {
    if (num_values > 0) {
      // TODO(wesm): Refactor to batch reads for improved memory use. It is not
      // trivial because the null_count is relative to the entire bitmap
      PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
          num_values, /*shrink_to_fit=*/false));
    }

    auto indices_buffer =
        reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());

    if (num_values != idx_decoder_.GetBatchSpaced(num_values, null_count, valid_bits,
                                                  valid_bits_offset, indices_buffer)) {
      ParquetException::EofException();
    }

    // XXX(wesm): Cannot append "valid bits" directly to the builder
    std::vector<uint8_t> valid_bytes(num_values, 0);
    int64_t i = 0;
    VisitNullBitmapInline(
        valid_bits, valid_bits_offset, num_values, null_count,
        [&]() { valid_bytes[i++] = 1; }, [&]() { ++i; });

    auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);
    PARQUET_THROW_NOT_OK(
        binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data()));
    num_values_ -= num_values - null_count;
    return num_values - null_count;
  }

  int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) override {
    num_values = std::min(num_values, num_values_);
    if (num_values > 0) {
      // TODO(wesm): Refactor to batch reads for improved memory use. This is
      // relatively simple here because we don't have to do any bookkeeping of
      // nulls
      PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
          num_values, /*shrink_to_fit=*/false));
    }
    auto indices_buffer =
        reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
    if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) {
      ParquetException::EofException();
    }
    auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);
    PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values));
    num_values_ -= num_values;
    return num_values;
  }

  int DecodeIndices(int num_values, int32_t* indices) override {
    if (num_values != idx_decoder_.GetBatch(indices, num_values)) {
      ParquetException::EofException();
    }
    num_values_ -= num_values;
    return num_values;
  }

  void GetDictionary(const T** dictionary, int32_t* dictionary_length) override {
    *dictionary_length = dictionary_length_;
    *dictionary = reinterpret_cast<T*>(dictionary_->mutable_data());
  }

 protected:
  Status IndexInBounds(int32_t index) {
    if (ARROW_PREDICT_TRUE(0 <= index && index < dictionary_length_)) {
      return Status::OK();
    }
    return Status::Invalid("Index not in dictionary bounds");
  }

  inline void DecodeDict(TypedDecoder<Type>* dictionary) {
    dictionary_length_ = static_cast<int32_t>(dictionary->values_left());
    PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T),
                                             /*shrink_to_fit=*/false));
    dictionary->Decode(reinterpret_cast<T*>(dictionary_->mutable_data()),
                       dictionary_length_);
  }

  // Only one is set.
  std::shared_ptr<ResizableBuffer> dictionary_;

  int32_t dictionary_length_;

  // Data that contains the byte array data (byte_array_dictionary_ just has the
  // pointers).
  std::shared_ptr<ResizableBuffer> byte_array_data_;

  // Arrow-style byte offsets for each dictionary value. We maintain two
  // representations of the dictionary, one as ByteArray* for non-Arrow
  // consumers and this one for Arrow consumers. Since dictionaries are
  // generally pretty small to begin with this doesn't mean too much extra
  // memory use in most cases
  std::shared_ptr<ResizableBuffer> byte_array_offsets_;

  // Reusable buffer for decoding dictionary indices to be appended to a
  // BinaryDictionary32Builder
  std::shared_ptr<ResizableBuffer> indices_scratch_space_;

  ::arrow::util::RleDecoder idx_decoder_;
};

template <typename Type>
void DictDecoderImpl<Type>::SetDict(TypedDecoder<Type>* dictionary) {
  DecodeDict(dictionary);
}

template <>
void DictDecoderImpl<BooleanType>::SetDict(TypedDecoder<BooleanType>* dictionary) {
  ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
}

template <>
void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictionary) {
  DecodeDict(dictionary);

  auto dict_values = reinterpret_cast<ByteArray*>(dictionary_->mutable_data());

  int total_size = 0;
  for (int i = 0; i < dictionary_length_; ++i) {
    total_size += dict_values[i].len;
  }
  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
                                                /*shrink_to_fit=*/false));
  PARQUET_THROW_NOT_OK(
      byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t),
                                  /*shrink_to_fit=*/false));

  int32_t offset = 0;
  uint8_t* bytes_data = byte_array_data_->mutable_data();
  int32_t* bytes_offsets =
      reinterpret_cast<int32_t*>(byte_array_offsets_->mutable_data());
  for (int i = 0; i < dictionary_length_; ++i) {
    memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len);
    bytes_offsets[i] = offset;
    dict_values[i].ptr = bytes_data + offset;
    offset += dict_values[i].len;
  }
  bytes_offsets[dictionary_length_] = offset;
}

template <>
inline void DictDecoderImpl<FLBAType>::SetDict(TypedDecoder<FLBAType>* dictionary) {
  DecodeDict(dictionary);

  auto dict_values = reinterpret_cast<FLBA*>(dictionary_->mutable_data());

  int fixed_len = descr_->type_length();
  int total_size = dictionary_length_ * fixed_len;

  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
                                                /*shrink_to_fit=*/false));
  uint8_t* bytes_data = byte_array_data_->mutable_data();
  for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) {
    memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len);
    dict_values[i].ptr = bytes_data + offset;
  }
}

template <>
inline int DictDecoderImpl<Int96Type>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<Int96Type>::Accumulator* builder) {
  ParquetException::NYI("DecodeArrow to Int96Type");
}

template <>
inline int DictDecoderImpl<Int96Type>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<Int96Type>::DictAccumulator* builder) {
  ParquetException::NYI("DecodeArrow to Int96Type");
}

template <>
inline int DictDecoderImpl<ByteArrayType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<ByteArrayType>::Accumulator* builder) {
  ParquetException::NYI("DecodeArrow implemented elsewhere");
}

template <>
inline int DictDecoderImpl<ByteArrayType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) {
  ParquetException::NYI("DecodeArrow implemented elsewhere");
}

template <typename DType>
int DictDecoderImpl<DType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<DType>::DictAccumulator* builder) {
  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  auto dict_values = reinterpret_cast<const typename DType::c_type*>(dictionary_->data());

  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        int32_t index;
        if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
          throw ParquetException("");
        }
        PARQUET_THROW_NOT_OK(IndexInBounds(index));
        PARQUET_THROW_NOT_OK(builder->Append(dict_values[index]));
      },
      [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });

  return num_values - null_count;
}

template <>
int DictDecoderImpl<BooleanType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
  ParquetException::NYI("No dictionary encoding for BooleanType");
}

template <>
inline int DictDecoderImpl<FLBAType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<FLBAType>::Accumulator* builder) {
  if (builder->byte_width() != descr_->type_length()) {
    throw ParquetException("Byte width mismatch: builder was " +
                           std::to_string(builder->byte_width()) + " but decoder was " +
                           std::to_string(descr_->type_length()));
  }

  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());

  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        int32_t index;
        if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
          throw ParquetException("");
        }
        PARQUET_THROW_NOT_OK(IndexInBounds(index));
        builder->UnsafeAppend(dict_values[index].ptr);
      },
      [&]() { builder->UnsafeAppendNull(); });

  return num_values - null_count;
}

template <>
int DictDecoderImpl<FLBAType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<FLBAType>::DictAccumulator* builder) {
  auto value_type =
      checked_cast<const ::arrow::DictionaryType&>(*builder->type()).value_type();
  auto byte_width =
      checked_cast<const ::arrow::FixedSizeBinaryType&>(*value_type).byte_width();
  if (byte_width != descr_->type_length()) {
    throw ParquetException("Byte width mismatch: builder was " +
                           std::to_string(byte_width) + " but decoder was " +
                           std::to_string(descr_->type_length()));
  }

  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());

  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        int32_t index;
        if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
          throw ParquetException("");
        }
        PARQUET_THROW_NOT_OK(IndexInBounds(index));
        PARQUET_THROW_NOT_OK(builder->Append(dict_values[index].ptr));
      },
      [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });

  return num_values - null_count;
}

template <typename Type>
int DictDecoderImpl<Type>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<Type>::Accumulator* builder) {
  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  using value_type = typename Type::c_type;
  auto dict_values = reinterpret_cast<const value_type*>(dictionary_->data());

  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        int32_t index;
        if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
          throw ParquetException("");
        }
        PARQUET_THROW_NOT_OK(IndexInBounds(index));
        builder->UnsafeAppend(dict_values[index]);
      },
      [&]() { builder->UnsafeAppendNull(); });

  return num_values - null_count;
}

template <typename Type>
void DictDecoderImpl<Type>::InsertDictionary(::arrow::ArrayBuilder* builder) {
  ParquetException::NYI("InsertDictionary only implemented for BYTE_ARRAY types");
}

template <>
void DictDecoderImpl<ByteArrayType>::InsertDictionary(::arrow::ArrayBuilder* builder) {
  auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);

  // Make a BinaryArray referencing the internal dictionary data
  auto arr = std::make_shared<::arrow::BinaryArray>(
      dictionary_length_, byte_array_offsets_, byte_array_data_);
  PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr));
}

class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
                                 virtual public ByteArrayDecoder {
 public:
  using BASE = DictDecoderImpl<ByteArrayType>;
  using BASE::DictDecoderImpl;

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  ::arrow::BinaryDictionary32Builder* builder) override {
    int result = 0;
    if (null_count == 0) {
      PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
    } else {
      PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
                                       valid_bits_offset, builder, &result));
    }
    return result;
  }

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
    int result = 0;
    if (null_count == 0) {
      PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result));
    } else {
      PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
                                            valid_bits_offset, out, &result));
    }
    return result;
  }


  int DecodeCH(int num_values, int null_count, const uint8_t* valid_bits,
               int64_t valid_bits_offset,
               PaddedPODArray<UInt8>* column_chars_t_p,
               PaddedPODArray<UInt64>* column_offsets_p) override {
      int result = 0;
      if (null_count == 0) {
          PARQUET_THROW_NOT_OK(DecodeCHDenseNonNull(num_values, column_chars_t_p, column_offsets_p, &result));
      } else {
          PARQUET_THROW_NOT_OK(DecodeCHDense(num_values, null_count, valid_bits,
                                                valid_bits_offset, column_chars_t_p, column_offsets_p, &result));
      }
      return result;
  }

 private:
     Status DecodeCHDense(int num_values, int null_count, const uint8_t* valid_bits,
                             int64_t valid_bits_offset,
                             PaddedPODArray<UInt8>* column_chars_t_p,
                             PaddedPODArray<UInt64>* column_offsets_p,
                             int* out_num_values) {
         constexpr int32_t kBufferSize = 1024;
         int32_t indices[kBufferSize];

         column_offsets_p->reserve(num_values);
         column_chars_t_p->reserve(num_values * 20); // approx

         ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);

         auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
         int values_decoded = 0;
         int num_appended = 0;
         while (num_appended < num_values) {
             bool is_valid = bit_reader.IsSet();
             bit_reader.Next();

             if (is_valid) {
                 int32_t batch_size =
                     std::min<int32_t>(kBufferSize, num_values - num_appended - null_count);
                 int num_indices = idx_decoder_.GetBatch(indices, batch_size);

                 if (ARROW_PREDICT_FALSE(num_indices < 1)) {
                     return Status::Invalid("Invalid number of indices '", num_indices, "'");
                 }

                 int i = 0;
                 while (true) {
                     // Consume all indices
                     if (is_valid) {
                         auto idx = indices[i];
                         RETURN_NOT_OK(IndexInBounds(idx));
                         const auto& val = dict_values[idx];
                         column_chars_t_p -> insert(val.ptr, val.ptr + static_cast<int32_t>(val.len));
                         column_chars_t_p -> emplace_back('\0');
                         column_offsets_p -> emplace_back(column_chars_t_p -> size());
                         ++i;
                         ++values_decoded;
                     } else {
                         column_chars_t_p -> emplace_back('\0');
                         column_offsets_p -> emplace_back(column_chars_t_p -> size());
                         --null_count;
                     }
                     ++num_appended;
                     if (i == num_indices) {
                         // Do not advance the bit_reader if we have fulfilled the decode
                         // request
                         break;
                     }
                     is_valid = bit_reader.IsSet();
                     bit_reader.Next();
                 }
             } else {
                 column_chars_t_p -> emplace_back('\0');
                 column_offsets_p -> emplace_back(column_chars_t_p -> size());
                 --null_count;
                 ++num_appended;
             }
         }
         *out_num_values = values_decoded;
         return Status::OK();
     }

     Status DecodeCHDenseNonNull(int num_values,
                                 PaddedPODArray<UInt8>* column_chars_t_p,
                                 PaddedPODArray<UInt64>* column_offsets_p,
                                    int* out_num_values) {
         constexpr int32_t kBufferSize = 2048;
         int32_t indices[kBufferSize];
         int values_decoded = 0;

         auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());

         while (values_decoded < num_values) {
             int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
             int num_indices = idx_decoder_.GetBatch(indices, batch_size);
             if (num_indices == 0) ParquetException::EofException();
             for (int i = 0; i < num_indices; ++i) {
                 auto idx = indices[i];
                 RETURN_NOT_OK(IndexInBounds(idx));
                 const auto& val = dict_values[idx];
                 column_chars_t_p -> insert(val.ptr, val.ptr + static_cast<int32_t>(val.len));
                 column_chars_t_p -> emplace_back('\0');
                 column_offsets_p -> emplace_back(column_chars_t_p -> size());
             }
             values_decoded += num_indices;
         }
         *out_num_values = values_decoded;
         return Status::OK();
     }


  Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
                          int64_t valid_bits_offset,
                          typename EncodingTraits<ByteArrayType>::Accumulator* out,
                          int* out_num_values) {
    constexpr int32_t kBufferSize = 1024;
    int32_t indices[kBufferSize];

    ArrowBinaryHelper helper(out);

    auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
    int values_decoded = 0;
    int num_indices = 0;
    int pos_indices = 0;

    auto visit_valid = [&](int64_t position) -> Status {
      if (num_indices == pos_indices) {
        // Refill indices buffer
        const auto batch_size =
            std::min<int32_t>(kBufferSize, num_values - null_count - values_decoded);
        num_indices = idx_decoder_.GetBatch(indices, batch_size);
        if (ARROW_PREDICT_FALSE(num_indices < 1)) {
          return Status::Invalid("Invalid number of indices: ", num_indices);
        }
        pos_indices = 0;
      }
      const auto index = indices[pos_indices++];
      RETURN_NOT_OK(IndexInBounds(index));
      const auto& val = dict_values[index];
      if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
        RETURN_NOT_OK(helper.PushChunk());
      }
      RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
      ++values_decoded;
      return Status::OK();
    };

    auto visit_null = [&]() -> Status {
      RETURN_NOT_OK(helper.AppendNull());
      return Status::OK();
    };

    ::arrow::internal::BitBlockCounter bit_blocks(valid_bits, valid_bits_offset,
                                                  num_values);
    int64_t position = 0;
    while (position < num_values) {
      const auto block = bit_blocks.NextWord();
      if (block.AllSet()) {
        for (int64_t i = 0; i < block.length; ++i, ++position) {
          ARROW_RETURN_NOT_OK(visit_valid(position));
        }
      } else if (block.NoneSet()) {
        for (int64_t i = 0; i < block.length; ++i, ++position) {
          ARROW_RETURN_NOT_OK(visit_null());
        }
      } else {
        for (int64_t i = 0; i < block.length; ++i, ++position) {
          if (bit_util::GetBit(valid_bits, valid_bits_offset + position)) {
            ARROW_RETURN_NOT_OK(visit_valid(position));
          } else {
            ARROW_RETURN_NOT_OK(visit_null());
          }
        }
      }
    }

    *out_num_values = values_decoded;
    return Status::OK();
  }

  Status DecodeArrowDenseNonNull(int num_values,
                                 typename EncodingTraits<ByteArrayType>::Accumulator* out,
                                 int* out_num_values) {
    constexpr int32_t kBufferSize = 2048;
    int32_t indices[kBufferSize];
    int values_decoded = 0;

    ArrowBinaryHelper helper(out);
    auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());

    while (values_decoded < num_values) {
      int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
      int num_indices = idx_decoder_.GetBatch(indices, batch_size);
      if (num_indices == 0) ParquetException::EofException();
      for (int i = 0; i < num_indices; ++i) {
        auto idx = indices[i];
        RETURN_NOT_OK(IndexInBounds(idx));
        const auto& val = dict_values[idx];
        if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
          RETURN_NOT_OK(helper.PushChunk());
        }
        RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
      }
      values_decoded += num_indices;
    }
    *out_num_values = values_decoded;
    return Status::OK();
  }

  template <typename BuilderType>
  Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                     int64_t valid_bits_offset, BuilderType* builder,
                     int* out_num_values) {
    constexpr int32_t kBufferSize = 1024;
    int32_t indices[kBufferSize];

    RETURN_NOT_OK(builder->Reserve(num_values));
    ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);

    auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());

    int values_decoded = 0;
    int num_appended = 0;
    while (num_appended < num_values) {
      bool is_valid = bit_reader.IsSet();
      bit_reader.Next();

      if (is_valid) {
        int32_t batch_size =
            std::min<int32_t>(kBufferSize, num_values - num_appended - null_count);
        int num_indices = idx_decoder_.GetBatch(indices, batch_size);

        int i = 0;
        while (true) {
          // Consume all indices
          if (is_valid) {
            auto idx = indices[i];
            RETURN_NOT_OK(IndexInBounds(idx));
            const auto& val = dict_values[idx];
            RETURN_NOT_OK(builder->Append(val.ptr, val.len));
            ++i;
            ++values_decoded;
          } else {
            RETURN_NOT_OK(builder->AppendNull());
            --null_count;
          }
          ++num_appended;
          if (i == num_indices) {
            // Do not advance the bit_reader if we have fulfilled the decode
            // request
            break;
          }
          is_valid = bit_reader.IsSet();
          bit_reader.Next();
        }
      } else {
        RETURN_NOT_OK(builder->AppendNull());
        --null_count;
        ++num_appended;
      }
    }
    *out_num_values = values_decoded;
    return Status::OK();
  }

  template <typename BuilderType>
  Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* out_num_values) {
    constexpr int32_t kBufferSize = 2048;
    int32_t indices[kBufferSize];

    RETURN_NOT_OK(builder->Reserve(num_values));

    auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());

    int values_decoded = 0;
    while (values_decoded < num_values) {
      int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
      int num_indices = idx_decoder_.GetBatch(indices, batch_size);
      if (num_indices == 0) ParquetException::EofException();
      for (int i = 0; i < num_indices; ++i) {
        auto idx = indices[i];
        RETURN_NOT_OK(IndexInBounds(idx));
        const auto& val = dict_values[idx];
        RETURN_NOT_OK(builder->Append(val.ptr, val.len));
      }
      values_decoded += num_indices;
    }
    *out_num_values = values_decoded;
    return Status::OK();
  }
};

// ----------------------------------------------------------------------
// DeltaBitPackEncoder

/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
/// as per the parquet spec. See:
/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
///
/// Consists of a header followed by blocks of delta encoded values binary packed.
///
///  Format
///    [header] [block 1] [block 2] ... [block N]
///
///  Header
///    [block size] [number of mini blocks per block] [total value count] [first value]
///
///  Block
///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
///
/// Sets aside bytes at the start of the internal buffer where the header will be written,
/// and only writes the header when FlushValues is called before returning it.
///
/// To encode a block, we will:
///
/// 1. Compute the differences between consecutive elements. For the first element in the
/// block, use the last element in the previous block or, in the case of the first block,
/// use the first value of the whole sequence, stored in the header.
///
/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
/// this min delta from all deltas in the block. This guarantees that all values are
/// non-negative.
///
/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
/// per mini block.
///
/// Supports only INT32 and INT64.

template <typename DType>
class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
  // Maximum possible header size
  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
  static constexpr uint32_t kValuesPerBlock = 128;
  static constexpr uint32_t kMiniBlocksPerBlock = 4;

 public:
  using T = typename DType::c_type;
  using UT = std::make_unsigned_t<T>;
  using TypedEncoder<DType>::Put;

  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
                               const uint32_t values_per_block = kValuesPerBlock,
                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
        values_per_block_(values_per_block),
        mini_blocks_per_block_(mini_blocks_per_block),
        values_per_mini_block_(values_per_block / mini_blocks_per_block),
        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
        bits_buffer_(
            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * sizeof(T))),
        sink_(pool),
        bit_writer_(bits_buffer_->mutable_data(),
                    static_cast<int>(bits_buffer_->size())) {
    if (values_per_block_ % 128 != 0) {
      throw ParquetException(
          "the number of values in a block must be multiple of 128, but it's " +
          std::to_string(values_per_block_));
    }
    if (values_per_mini_block_ % 32 != 0) {
      throw ParquetException(
          "the number of values in a miniblock must be multiple of 32, but it's " +
          std::to_string(values_per_mini_block_));
    }
    if (values_per_block % mini_blocks_per_block != 0) {
      throw ParquetException(
          "the number of values per block % number of miniblocks per block must be 0, "
          "but it's " +
          std::to_string(values_per_block % mini_blocks_per_block));
    }
    // Reserve enough space at the beginning of the buffer for largest possible header.
    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
  }

  std::shared_ptr<Buffer> FlushValues() override;

  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }

  void Put(const ::arrow::Array& values) override;

  void Put(const T* buffer, int num_values) override;

  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
                 int64_t valid_bits_offset) override;

  void FlushBlock();

 private:
  const uint32_t values_per_block_;
  const uint32_t mini_blocks_per_block_;
  const uint32_t values_per_mini_block_;
  uint32_t values_current_block_{0};
  uint32_t total_value_count_{0};
  UT first_value_{0};
  UT current_value_{0};
  ArrowPoolVector<UT> deltas_;
  std::shared_ptr<ResizableBuffer> bits_buffer_;
  ::arrow::BufferBuilder sink_;
  ::arrow::bit_util::BitWriter bit_writer_;
};

template <typename DType>
void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
  if (num_values == 0) {
    return;
  }

  int idx = 0;
  if (total_value_count_ == 0) {
    current_value_ = src[0];
    first_value_ = current_value_;
    idx = 1;
  }
  total_value_count_ += num_values;

  while (idx < num_values) {
    UT value = static_cast<UT>(src[idx]);
    // Calculate deltas. The possible overflow is handled by use of unsigned integers
    // making subtraction operations well-defined and correct even in case of overflow.
    // Encoded integers will wrap back around on decoding.
    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
    deltas_[values_current_block_] = value - current_value_;
    current_value_ = value;
    idx++;
    values_current_block_++;
    if (values_current_block_ == values_per_block_) {
      FlushBlock();
    }
  }
}

template <typename DType>
void DeltaBitPackEncoder<DType>::FlushBlock() {
  if (values_current_block_ == 0) {
    return;
  }

  const UT min_delta =
      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));

  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write
  // bit widths of miniblocks as they become known during the encoding.
  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
  DCHECK(bit_width_data != nullptr);

  const uint32_t num_miniblocks =
      static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
                                      static_cast<double>(values_per_mini_block_)));
  for (uint32_t i = 0; i < num_miniblocks; i++) {
    const uint32_t values_current_mini_block =
        std::min(values_per_mini_block_, values_current_block_);

    const uint32_t start = i * values_per_mini_block_;
    const UT max_delta = *std::max_element(
        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);

    // The minimum number of bits required to write any of values in deltas_ vector.
    // See overflow comment above.
    const auto bit_width = bit_width_data[i] =
        bit_util::NumRequiredBits(max_delta - min_delta);

    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
      // See overflow comment above.
      const UT value = deltas_[j] - min_delta;
      bit_writer_.PutValue(value, bit_width);
    }
    // If there are not enough values to fill the last mini block, we pad the mini block
    // with zeroes so that its length is the number of values in a full mini block
    // multiplied by the bit width.
    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
      bit_writer_.PutValue(0, bit_width);
    }
    values_current_block_ -= values_current_mini_block;
  }

  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
  // needed to store the values, the bytes storing the bit widths of the unneeded
  // miniblocks are still present, their value should be zero, but readers must accept
  // arbitrary values as well.
  for (uint32_t i = num_miniblocks; i < mini_blocks_per_block_; i++) {
    bit_width_data[i] = 0;
  }
  DCHECK_EQ(values_current_block_, 0);

  bit_writer_.Flush();
  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
  bit_writer_.Clear();
}

template <typename DType>
std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
  if (values_current_block_ > 0) {
    FlushBlock();
  }
  PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true));

  uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {};
  bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
  if (!header_writer.PutVlqInt(values_per_block_) ||
      !header_writer.PutVlqInt(mini_blocks_per_block_) ||
      !header_writer.PutVlqInt(total_value_count_) ||
      !header_writer.PutZigZagVlqInt(static_cast<T>(first_value_))) {
    throw ParquetException("header writing error");
  }
  header_writer.Flush();

  // We reserved enough space at the beginning of the buffer for largest possible header
  // and data was written immediately after. We now write the header data immediately
  // before the end of reserved space.
  const size_t offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written();
  std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
              header_writer.bytes_written());

  // Reset counter of cached values
  total_value_count_ = 0;
  // Reserve enough space at the beginning of the buffer for largest possible header.
  PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));

  // Excess bytes at the beginning are sliced off and ignored.
  return SliceBuffer(buffer, offset_bytes);
}

template <>
void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
  const ::arrow::ArrayData& data = *values.data();
  if (values.type_id() != ::arrow::Type::INT32) {
    throw ParquetException("Expected Int32TArray, got ", values.type()->ToString());
  }
  if (data.length > std::numeric_limits<int32_t>::max()) {
    throw ParquetException("Array cannot be longer than ",
                           std::numeric_limits<int32_t>::max());
  }

  if (values.null_count() == 0) {
    Put(data.GetValues<int32_t>(1), static_cast<int>(data.length));
  } else {
    PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
              data.GetValues<uint8_t>(0, 0), data.offset);
  }
}

template <>
void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
  const ::arrow::ArrayData& data = *values.data();
  if (values.type_id() != ::arrow::Type::INT64) {
    throw ParquetException("Expected Int64TArray, got ", values.type()->ToString());
  }
  if (data.length > std::numeric_limits<int32_t>::max()) {
    throw ParquetException("Array cannot be longer than ",
                           std::numeric_limits<int32_t>::max());
  }
  if (values.null_count() == 0) {
    Put(data.GetValues<int64_t>(1), static_cast<int>(data.length));
  } else {
    PutSpaced(data.GetValues<int64_t>(1), static_cast<int>(data.length),
              data.GetValues<uint8_t>(0, 0), data.offset);
  }
}

template <typename DType>
void DeltaBitPackEncoder<DType>::PutSpaced(const T* src, int num_values,
                                           const uint8_t* valid_bits,
                                           int64_t valid_bits_offset) {
  if (valid_bits != NULLPTR) {
    PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
                                                                 this->memory_pool()));
    T* data = reinterpret_cast<T*>(buffer->mutable_data());
    int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
        src, num_values, valid_bits, valid_bits_offset, data);
    Put(data, num_valid_values);
  } else {
    Put(src, num_values);
  }
}

// ----------------------------------------------------------------------
// DeltaBitPackDecoder

template <typename DType>
class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
 public:
  typedef typename DType::c_type T;
  using UT = std::make_unsigned_t<T>;

  explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
                               MemoryPool* pool = ::arrow::default_memory_pool())
      : DecoderImpl(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) {
    if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
      throw ParquetException("Delta bit pack encoding should only be for integer data.");
    }
  }

  void SetData(int num_values, const uint8_t* data, int len) override {
    // num_values is equal to page's num_values, including null values in this page
    this->num_values_ = num_values;
    decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len);
    InitHeader();
  }

  // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder or
  // DeltaByteArrayDecoder
  void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) {
    this->num_values_ = num_values;
    decoder_ = std::move(decoder);
    InitHeader();
  }

  int ValidValuesCount() {
    // total_value_count_ in header ignores of null values
    return static_cast<int>(total_value_count_);
  }

  int Decode(T* buffer, int max_values) override {
    return GetInternal(buffer, max_values);
  }

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<DType>::Accumulator* out) override {
    if (null_count != 0) {
      ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
    }
    std::vector<T> values(num_values);
    int decoded_count = GetInternal(values.data(), num_values);
    PARQUET_THROW_NOT_OK(out->AppendValues(values.data(), decoded_count));
    return decoded_count;
  }

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<DType>::DictAccumulator* out) override {
    if (null_count != 0) {
      ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
    }
    std::vector<T> values(num_values);
    int decoded_count = GetInternal(values.data(), num_values);
    PARQUET_THROW_NOT_OK(out->Reserve(decoded_count));
    for (int i = 0; i < decoded_count; ++i) {
      PARQUET_THROW_NOT_OK(out->Append(values[i]));
    }
    return decoded_count;
  }

 private:
  static constexpr int kMaxDeltaBitWidth = static_cast<int>(sizeof(T) * 8);

  void InitHeader() {
    if (!decoder_->GetVlqInt(&values_per_block_) ||
        !decoder_->GetVlqInt(&mini_blocks_per_block_) ||
        !decoder_->GetVlqInt(&total_value_count_) ||
        !decoder_->GetZigZagVlqInt(&last_value_)) {
      ParquetException::EofException();
    }

    if (values_per_block_ == 0) {
      throw ParquetException("cannot have zero value per block");
    }
    if (values_per_block_ % 128 != 0) {
      throw ParquetException(
          "the number of values in a block must be multiple of 128, but it's " +
          std::to_string(values_per_block_));
    }
    if (mini_blocks_per_block_ == 0) {
      throw ParquetException("cannot have zero miniblock per block");
    }
    values_per_mini_block_ = values_per_block_ / mini_blocks_per_block_;
    if (values_per_mini_block_ == 0) {
      throw ParquetException("cannot have zero value per miniblock");
    }
    if (values_per_mini_block_ % 32 != 0) {
      throw ParquetException(
          "the number of values in a miniblock must be multiple of 32, but it's " +
          std::to_string(values_per_mini_block_));
    }

    delta_bit_widths_ = AllocateBuffer(pool_, mini_blocks_per_block_);
    block_initialized_ = false;
    values_current_mini_block_ = 0;
  }

  void InitBlock() {
    if (!decoder_->GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();

    // read the bitwidth of each miniblock
    uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
    for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
      if (!decoder_->GetAligned<uint8_t>(1, bit_width_data + i)) {
        ParquetException::EofException();
      }
      if (bit_width_data[i] > kMaxDeltaBitWidth) {
        throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) +
                               " larger than integer bit width " +
                               std::to_string(kMaxDeltaBitWidth));
      }
    }
    mini_block_idx_ = 0;
    delta_bit_width_ = bit_width_data[0];
    values_current_mini_block_ = values_per_mini_block_;
    block_initialized_ = true;
  }

  int GetInternal(T* buffer, int max_values) {
    max_values = static_cast<int>(std::min<int64_t>(max_values, total_value_count_));
    if (max_values == 0) {
      return 0;
    }

    int i = 0;
    while (i < max_values) {
      if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
        if (ARROW_PREDICT_FALSE(!block_initialized_)) {
          buffer[i++] = last_value_;
          DCHECK_EQ(i, 1);  // we're at the beginning of the page
          if (ARROW_PREDICT_FALSE(i == max_values)) {
            // When block is uninitialized and i reaches max_values we have two
            // different possibilities:
            // 1. total_value_count_ == 1, which means that the page may have only
            // one value (encoded in the header), and we should not initialize
            // any block.
            // 2. total_value_count_ != 1, which means we should initialize the
            // incoming block for subsequent reads.
            if (total_value_count_ != 1) {
              InitBlock();
            }
            break;
          }
          InitBlock();
        } else {
          ++mini_block_idx_;
          if (mini_block_idx_ < mini_blocks_per_block_) {
            delta_bit_width_ = delta_bit_widths_->data()[mini_block_idx_];
            values_current_mini_block_ = values_per_mini_block_;
          } else {
            InitBlock();
          }
        }
      }

      int values_decode =
          std::min(values_current_mini_block_, static_cast<uint32_t>(max_values - i));
      if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) !=
          values_decode) {
        ParquetException::EofException();
      }
      for (int j = 0; j < values_decode; ++j) {
        // Addition between min_delta, packed int and last_value should be treated as
        // unsigned addition. Overflow is as expected.
        buffer[i + j] = static_cast<UT>(min_delta_) + static_cast<UT>(buffer[i + j]) +
                        static_cast<UT>(last_value_);
        last_value_ = buffer[i + j];
      }
      values_current_mini_block_ -= values_decode;
      i += values_decode;
    }
    total_value_count_ -= max_values;
    this->num_values_ -= max_values;

    if (ARROW_PREDICT_FALSE(total_value_count_ == 0)) {
      uint32_t padding_bits = values_current_mini_block_ * delta_bit_width_;
      // skip the padding bits
      if (!decoder_->Advance(padding_bits)) {
        ParquetException::EofException();
      }
      values_current_mini_block_ = 0;
    }
    return max_values;
  }

  MemoryPool* pool_;
  std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
  uint32_t values_per_block_;
  uint32_t mini_blocks_per_block_;
  uint32_t values_per_mini_block_;
  uint32_t values_current_mini_block_;
  uint32_t total_value_count_;

  bool block_initialized_;
  T min_delta_;
  uint32_t mini_block_idx_;
  std::shared_ptr<ResizableBuffer> delta_bit_widths_;
  int delta_bit_width_;

  T last_value_;
};

// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY

class DeltaLengthByteArrayDecoder : public DecoderImpl,
                                    virtual public TypedDecoder<ByteArrayType> {
 public:
  explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
                                       MemoryPool* pool = ::arrow::default_memory_pool())
      : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
        len_decoder_(nullptr, pool),
        buffered_length_(AllocateBuffer(pool, 0)),
        buffered_data_(AllocateBuffer(pool, 0)) {}

  void SetData(int num_values, const uint8_t* data, int len) override {
    num_values_ = num_values;
    if (len == 0) return;
    decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len);
    DecodeLengths();
  }

  void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) {
    num_values_ = num_values;
    decoder_ = decoder;
    DecodeLengths();
  }

  int Decode(ByteArray* buffer, int max_values) override {
    // Decode up to `max_values` strings into an internal buffer
    // and reference them into `buffer`.
    max_values = std::min(max_values, num_valid_values_);
    if (max_values == 0) {
      return 0;
    }

    int32_t data_size = 0;
    const int32_t* length_ptr =
        reinterpret_cast<const int32_t*>(buffered_length_->data()) + length_idx_;
    for (int i = 0; i < max_values; ++i) {
      int32_t len = length_ptr[i];
      if (ARROW_PREDICT_FALSE(len < 0)) {
        throw ParquetException("negative string delta length");
      }
      buffer[i].len = len;
      if (AddWithOverflow(data_size, len, &data_size)) {
        throw ParquetException("excess expansion in DELTA_(LENGTH_)BYTE_ARRAY");
      }
    }
    length_idx_ += max_values;

    PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size));
    if (decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size) != data_size) {
      ParquetException::EofException();
    }
    const uint8_t* data_ptr = buffered_data_->data();

    for (int i = 0; i < max_values; ++i) {
      buffer[i].ptr = data_ptr;
      data_ptr += buffer[i].len;
    }
    this->num_values_ -= max_values;
    num_valid_values_ -= max_values;
    return max_values;
  }

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
    ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
  }

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<ByteArrayType>::DictAccumulator* out) override {
    ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
  }

 private:
  // Decode all the encoded lengths. The decoder_ will be at the start of the encoded data
  // after that.
  void DecodeLengths() {
    len_decoder_.SetDecoder(num_values_, decoder_);

    // get the number of encoded lengths
    int num_length = len_decoder_.ValidValuesCount();
    PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * sizeof(int32_t)));

    // call len_decoder_.Decode to decode all the lengths.
    // all the lengths are buffered in buffered_length_.
    int ret = len_decoder_.Decode(
        reinterpret_cast<int32_t*>(buffered_length_->mutable_data()), num_length);
    DCHECK_EQ(ret, num_length);
    length_idx_ = 0;
    num_valid_values_ = num_length;
  }

  std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
  DeltaBitPackDecoder<Int32Type> len_decoder_;
  int num_valid_values_;
  uint32_t length_idx_;
  std::shared_ptr<ResizableBuffer> buffered_length_;
  std::shared_ptr<ResizableBuffer> buffered_data_;
};

// ----------------------------------------------------------------------
// RLE_BOOLEAN_DECODER

class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
 public:
  explicit RleBooleanDecoder(const ColumnDescriptor* descr)
      : DecoderImpl(descr, Encoding::RLE) {}

  void SetData(int num_values, const uint8_t* data, int len) override {
    num_values_ = num_values;
    uint32_t num_bytes = 0;

    if (len < 4) {
      throw ParquetException("Received invalid length : " + std::to_string(len) +
                             " (corrupt data page?)");
    }
    // Load the first 4 bytes in little-endian, which indicates the length
    num_bytes =
        ::arrow::bit_util::ToLittleEndian(::arrow::util::SafeLoadAs<uint32_t>(data));
    if (num_bytes < 0 || num_bytes > static_cast<uint32_t>(len - 4)) {
      throw ParquetException("Received invalid number of bytes : " +
                             std::to_string(num_bytes) + " (corrupt data page?)");
    }

    auto decoder_data = data + 4;
    decoder_ = std::make_shared<::arrow::util::RleDecoder>(decoder_data, num_bytes,
                                                           /*bit_width=*/1);
  }

  int Decode(bool* buffer, int max_values) override {
    max_values = std::min(max_values, num_values_);

    if (decoder_->GetBatch(buffer, max_values) != max_values) {
      ParquetException::EofException();
    }
    num_values_ -= max_values;
    return max_values;
  }

  int Decode(uint8_t* buffer, int max_values) override {
    ParquetException::NYI("Decode(uint8_t*, int) for RleBooleanDecoder");
  }

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<BooleanType>::Accumulator* out) override {
    ParquetException::NYI("DecodeArrow for RleBooleanDecoder");
  }

  int DecodeArrow(
      int num_values, int null_count, const uint8_t* valid_bits,
      int64_t valid_bits_offset,
      typename EncodingTraits<BooleanType>::DictAccumulator* builder) override {
    ParquetException::NYI("DecodeArrow for RleBooleanDecoder");
  }

 private:
  std::shared_ptr<::arrow::util::RleDecoder> decoder_;
};

// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY

class DeltaByteArrayDecoder : public DecoderImpl,
                              virtual public TypedDecoder<ByteArrayType> {
 public:
  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
                                 MemoryPool* pool = ::arrow::default_memory_pool())
      : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
        prefix_len_decoder_(nullptr, pool),
        suffix_decoder_(nullptr, pool),
        last_value_in_previous_page_(""),
        buffered_prefix_length_(AllocateBuffer(pool, 0)),
        buffered_data_(AllocateBuffer(pool, 0)) {}

  void SetData(int num_values, const uint8_t* data, int len) override {
    num_values_ = num_values;
    decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len);
    prefix_len_decoder_.SetDecoder(num_values, decoder_);

    // get the number of encoded prefix lengths
    int num_prefix = prefix_len_decoder_.ValidValuesCount();
    // call prefix_len_decoder_.Decode to decode all the prefix lengths.
    // all the prefix lengths are buffered in buffered_prefix_length_.
    PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t)));
    int ret = prefix_len_decoder_.Decode(
        reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()), num_prefix);
    DCHECK_EQ(ret, num_prefix);
    prefix_len_offset_ = 0;
    num_valid_values_ = num_prefix;

    // at this time, the decoder_ will be at the start of the encoded suffix data.
    suffix_decoder_.SetDecoder(num_values, decoder_);

    // TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set
    // to last_value_in_previous_page_ when decoding a new page(except the first page)
    last_value_ = "";
  }

  int Decode(ByteArray* buffer, int max_values) override {
    return GetInternal(buffer, max_values);
  }

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
    int result = 0;
    PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
                                          valid_bits_offset, out, &result));
    return result;
  }

  int DecodeArrow(
      int num_values, int null_count, const uint8_t* valid_bits,
      int64_t valid_bits_offset,
      typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) override {
    ParquetException::NYI("DecodeArrow of DictAccumulator for DeltaByteArrayDecoder");
  }

 private:
  int GetInternal(ByteArray* buffer, int max_values) {
    // Decode up to `max_values` strings into an internal buffer
    // and reference them into `buffer`.
    max_values = std::min(max_values, num_valid_values_);
    if (max_values == 0) {
      return max_values;
    }

    int suffix_read = suffix_decoder_.Decode(buffer, max_values);
    if (ARROW_PREDICT_FALSE(suffix_read != max_values)) {
      ParquetException::EofException("Read " + std::to_string(suffix_read) +
                                     ", expecting " + std::to_string(max_values) +
                                     " from suffix decoder");
    }

    int64_t data_size = 0;
    const int32_t* prefix_len_ptr =
        reinterpret_cast<const int32_t*>(buffered_prefix_length_->data()) +
        prefix_len_offset_;
    for (int i = 0; i < max_values; ++i) {
      if (ARROW_PREDICT_FALSE(prefix_len_ptr[i] < 0)) {
        throw ParquetException("negative prefix length in DELTA_BYTE_ARRAY");
      }
      if (ARROW_PREDICT_FALSE(AddWithOverflow(data_size, prefix_len_ptr[i], &data_size) ||
                              AddWithOverflow(data_size, buffer[i].len, &data_size))) {
        throw ParquetException("excess expansion in DELTA_BYTE_ARRAY");
      }
    }
    PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size));

    string_view prefix{last_value_};
    uint8_t* data_ptr = buffered_data_->mutable_data();
    for (int i = 0; i < max_values; ++i) {
      if (ARROW_PREDICT_FALSE(static_cast<size_t>(prefix_len_ptr[i]) > prefix.length())) {
        throw ParquetException("prefix length too large in DELTA_BYTE_ARRAY");
      }
      memcpy(data_ptr, prefix.data(), prefix_len_ptr[i]);
      // buffer[i] currently points to the string suffix
      memcpy(data_ptr + prefix_len_ptr[i], buffer[i].ptr, buffer[i].len);
      buffer[i].ptr = data_ptr;
      buffer[i].len += prefix_len_ptr[i];
      data_ptr += buffer[i].len;
      prefix = string_view{reinterpret_cast<const char*>(buffer[i].ptr), buffer[i].len};
    }
    prefix_len_offset_ += max_values;
    this->num_values_ -= max_values;
    num_valid_values_ -= max_values;
    last_value_ = std::string{prefix};

    if (num_valid_values_ == 0) {
      last_value_in_previous_page_ = last_value_;
    }
    return max_values;
  }

  Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
                          int64_t valid_bits_offset,
                          typename EncodingTraits<ByteArrayType>::Accumulator* out,
                          int* out_num_values) {
    ArrowBinaryHelper helper(out);

    std::vector<ByteArray> values(num_values);
    const int num_valid_values = GetInternal(values.data(), num_values - null_count);
    DCHECK_EQ(num_values - null_count, num_valid_values);

    auto values_ptr = reinterpret_cast<const ByteArray*>(values.data());
    int value_idx = 0;

    RETURN_NOT_OK(VisitNullBitmapInline(
        valid_bits, valid_bits_offset, num_values, null_count,
        [&]() {
          const auto& val = values_ptr[value_idx];
          if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
            RETURN_NOT_OK(helper.PushChunk());
          }
          RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
          ++value_idx;
          return Status::OK();
        },
        [&]() {
          RETURN_NOT_OK(helper.AppendNull());
          --null_count;
          return Status::OK();
        }));

    DCHECK_EQ(null_count, 0);
    *out_num_values = num_valid_values;
    return Status::OK();
  }

  std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
  DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
  DeltaLengthByteArrayDecoder suffix_decoder_;
  std::string last_value_;
  // string buffer for last value in previous page
  std::string last_value_in_previous_page_;
  int num_valid_values_;
  uint32_t prefix_len_offset_;
  std::shared_ptr<ResizableBuffer> buffered_prefix_length_;
  std::shared_ptr<ResizableBuffer> buffered_data_;
};

// ----------------------------------------------------------------------
// BYTE_STREAM_SPLIT

template <typename DType>
class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
 public:
  using T = typename DType::c_type;
  explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);

  int Decode(T* buffer, int max_values) override;

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<DType>::Accumulator* builder) override;

  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                  int64_t valid_bits_offset,
                  typename EncodingTraits<DType>::DictAccumulator* builder) override;

  void SetData(int num_values, const uint8_t* data, int len) override;

  T* EnsureDecodeBuffer(int64_t min_values) {
    const int64_t size = sizeof(T) * min_values;
    if (!decode_buffer_ || decode_buffer_->size() < size) {
      PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size));
    }
    return reinterpret_cast<T*>(decode_buffer_->mutable_data());
  }

 private:
  int num_values_in_buffer_{0};
  std::shared_ptr<Buffer> decode_buffer_;

  static constexpr size_t kNumStreams = sizeof(T);
};

template <typename DType>
ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor* descr)
    : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {}

template <typename DType>
void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t* data,
                                            int len) {
  DecoderImpl::SetData(num_values, data, len);
  if (num_values * static_cast<int64_t>(sizeof(T)) > len) {
    throw ParquetException("Data size too small for number of values (corrupted file?)");
  }
  num_values_in_buffer_ = num_values;
}

template <typename DType>
int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
  const int values_to_decode = std::min(num_values_, max_values);
  const int num_decoded_previously = num_values_in_buffer_ - num_values_;
  const uint8_t* data = data_ + num_decoded_previously;

  ::arrow::util::internal::ByteStreamSplitDecode<T>(data, values_to_decode,
                                                    num_values_in_buffer_, buffer);
  num_values_ -= values_to_decode;
  len_ -= sizeof(T) * values_to_decode;
  return values_to_decode;
}

template <typename DType>
int ByteStreamSplitDecoder<DType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<DType>::Accumulator* builder) {
  constexpr int value_size = static_cast<int>(kNumStreams);
  int values_decoded = num_values - null_count;
  if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
    ParquetException::EofException();
  }

  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

  const int num_decoded_previously = num_values_in_buffer_ - num_values_;
  const uint8_t* data = data_ + num_decoded_previously;
  int offset = 0;

#if defined(ARROW_HAVE_SIMD_SPLIT)
  // Use fast decoding into intermediate buffer.  This will also decode
  // some null values, but it's fast enough that we don't care.
  T* decode_out = EnsureDecodeBuffer(values_decoded);
  ::arrow::util::internal::ByteStreamSplitDecode<T>(data, values_decoded,
                                                    num_values_in_buffer_, decode_out);

  // XXX If null_count is 0, we could even append in bulk or decode directly into
  // builder
  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        builder->UnsafeAppend(decode_out[offset]);
        ++offset;
      },
      [&]() { builder->UnsafeAppendNull(); });

#else
  VisitNullBitmapInline(
      valid_bits, valid_bits_offset, num_values, null_count,
      [&]() {
        uint8_t gathered_byte_data[kNumStreams];
        for (size_t b = 0; b < kNumStreams; ++b) {
          const size_t byte_index = b * num_values_in_buffer_ + offset;
          gathered_byte_data[b] = data[byte_index];
        }
        builder->UnsafeAppend(::arrow::util::SafeLoadAs<T>(&gathered_byte_data[0]));
        ++offset;
      },
      [&]() { builder->UnsafeAppendNull(); });
#endif

  num_values_ -= values_decoded;
  len_ -= sizeof(T) * values_decoded;
  return values_decoded;
}

template <typename DType>
int ByteStreamSplitDecoder<DType>::DecodeArrow(
    int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
    typename EncodingTraits<DType>::DictAccumulator* builder) {
  ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
}

}  // namespace

// ----------------------------------------------------------------------
// Encoder and decoder factory functions

std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encoding,
                                     bool use_dictionary, const ColumnDescriptor* descr,
                                     MemoryPool* pool) {
  if (use_dictionary) {
    switch (type_num) {
      case Type::INT32:
        return std::make_unique<DictEncoderImpl<Int32Type>>(descr, pool);
      case Type::INT64:
        return std::make_unique<DictEncoderImpl<Int64Type>>(descr, pool);
      case Type::INT96:
        return std::make_unique<DictEncoderImpl<Int96Type>>(descr, pool);
      case Type::FLOAT:
        return std::make_unique<DictEncoderImpl<FloatType>>(descr, pool);
      case Type::DOUBLE:
        return std::make_unique<DictEncoderImpl<DoubleType>>(descr, pool);
      case Type::BYTE_ARRAY:
        return std::make_unique<DictEncoderImpl<ByteArrayType>>(descr, pool);
      case Type::FIXED_LEN_BYTE_ARRAY:
        return std::make_unique<DictEncoderImpl<FLBAType>>(descr, pool);
      default:
        DCHECK(false) << "Encoder not implemented";
        break;
    }
  } else if (encoding == Encoding::PLAIN) {
    switch (type_num) {
      case Type::BOOLEAN:
        return std::make_unique<PlainEncoder<BooleanType>>(descr, pool);
      case Type::INT32:
        return std::make_unique<PlainEncoder<Int32Type>>(descr, pool);
      case Type::INT64:
        return std::make_unique<PlainEncoder<Int64Type>>(descr, pool);
      case Type::INT96:
        return std::make_unique<PlainEncoder<Int96Type>>(descr, pool);
      case Type::FLOAT:
        return std::make_unique<PlainEncoder<FloatType>>(descr, pool);
      case Type::DOUBLE:
        return std::make_unique<PlainEncoder<DoubleType>>(descr, pool);
      case Type::BYTE_ARRAY:
        return std::make_unique<PlainEncoder<ByteArrayType>>(descr, pool);
      case Type::FIXED_LEN_BYTE_ARRAY:
        return std::make_unique<PlainEncoder<FLBAType>>(descr, pool);
      default:
        DCHECK(false) << "Encoder not implemented";
        break;
    }
  } else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
    switch (type_num) {
      case Type::FLOAT:
        return std::make_unique<ByteStreamSplitEncoder<FloatType>>(descr, pool);
      case Type::DOUBLE:
        return std::make_unique<ByteStreamSplitEncoder<DoubleType>>(descr, pool);
      default:
        throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
        break;
    }
  } else if (encoding == Encoding::DELTA_BINARY_PACKED) {
    switch (type_num) {
      case Type::INT32:
        return std::make_unique<DeltaBitPackEncoder<Int32Type>>(descr, pool);
      case Type::INT64:
        return std::make_unique<DeltaBitPackEncoder<Int64Type>>(descr, pool);
      default:
        throw ParquetException(
            "DELTA_BINARY_PACKED encoder only supports INT32 and INT64");
        break;
    }
  } else {
    ParquetException::NYI("Selected encoding is not supported");
  }
  DCHECK(false) << "Should not be able to reach this code";
  return nullptr;
}

std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding,
                                     const ColumnDescriptor* descr) {
  if (encoding == Encoding::PLAIN) {
    switch (type_num) {
      case Type::BOOLEAN:
        return std::make_unique<PlainBooleanDecoder>(descr);
      case Type::INT32:
        return std::make_unique<PlainDecoder<Int32Type>>(descr);
      case Type::INT64:
        return std::make_unique<PlainDecoder<Int64Type>>(descr);
      case Type::INT96:
        return std::make_unique<PlainDecoder<Int96Type>>(descr);
      case Type::FLOAT:
        return std::make_unique<PlainDecoder<FloatType>>(descr);
      case Type::DOUBLE:
        return std::make_unique<PlainDecoder<DoubleType>>(descr);
      case Type::BYTE_ARRAY:
        return std::make_unique<PlainByteArrayDecoder>(descr);
      case Type::FIXED_LEN_BYTE_ARRAY:
        return std::make_unique<PlainFLBADecoder>(descr);
      default:
        break;
    }
  } else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
    switch (type_num) {
      case Type::FLOAT:
        return std::make_unique<ByteStreamSplitDecoder<FloatType>>(descr);
      case Type::DOUBLE:
        return std::make_unique<ByteStreamSplitDecoder<DoubleType>>(descr);
      default:
        throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
        break;
    }
  } else if (encoding == Encoding::DELTA_BINARY_PACKED) {
    switch (type_num) {
      case Type::INT32:
        return std::make_unique<DeltaBitPackDecoder<Int32Type>>(descr);
      case Type::INT64:
        return std::make_unique<DeltaBitPackDecoder<Int64Type>>(descr);
      default:
        throw ParquetException(
            "DELTA_BINARY_PACKED decoder only supports INT32 and INT64");
        break;
    }
  } else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
    if (type_num == Type::BYTE_ARRAY) {
      return std::make_unique<DeltaByteArrayDecoder>(descr);
    }
    throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY");
  } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) {
    if (type_num == Type::BYTE_ARRAY) {
      return std::make_unique<DeltaLengthByteArrayDecoder>(descr);
    }
    throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY");
  } else if (encoding == Encoding::RLE) {
    if (type_num == Type::BOOLEAN) {
      return std::make_unique<RleBooleanDecoder>(descr);
    }
    throw ParquetException("RLE encoding only supports BOOLEAN");
  } else {
    ParquetException::NYI("Selected encoding is not supported");
  }
  DCHECK(false) << "Should not be able to reach this code";
  return nullptr;
}

namespace detail {
std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num,
                                         const ColumnDescriptor* descr,
                                         MemoryPool* pool) {
  switch (type_num) {
    case Type::BOOLEAN:
      ParquetException::NYI("Dictionary encoding not implemented for boolean type");
    case Type::INT32:
      return std::make_unique<DictDecoderImpl<Int32Type>>(descr, pool);
    case Type::INT64:
      return std::make_unique<DictDecoderImpl<Int64Type>>(descr, pool);
    case Type::INT96:
      return std::make_unique<DictDecoderImpl<Int96Type>>(descr, pool);
    case Type::FLOAT:
      return std::make_unique<DictDecoderImpl<FloatType>>(descr, pool);
    case Type::DOUBLE:
      return std::make_unique<DictDecoderImpl<DoubleType>>(descr, pool);
    case Type::BYTE_ARRAY:
      return std::make_unique<DictByteArrayDecoderImpl>(descr, pool);
    case Type::FIXED_LEN_BYTE_ARRAY:
      return std::make_unique<DictDecoderImpl<FLBAType>>(descr, pool);
    default:
      break;
  }
  DCHECK(false) << "Should not be able to reach this code";
  return nullptr;
}

}  // namespace detail
}  // namespace parquet
