cpp-ch/local-engine/Storages/ch_parquet/arrow/encoding.cc (2,718 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "encoding.h" #include <algorithm> #include <cstdint> #include <cstdlib> #include <limits> #include <memory> #include <string> #include <string_view> #include <utility> #include <vector> #include "arrow/array.h" #include "arrow/array/builder_dict.h" #include "arrow/stl_allocator.h" #include "arrow/type_traits.h" #include "arrow/util/bit_block_counter.h" #include "arrow/util/bit_run_reader.h" #include "arrow/util/bit_stream_utils.h" #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/bitmap_writer.h" #include "arrow/util/byte_stream_split.h" #include "arrow/util/checked_cast.h" #include "arrow/util/hashing.h" #include "arrow/util/int_util_overflow.h" #include "arrow/util/logging.h" #include "arrow/util/rle_encoding.h" #include "arrow/util/ubsan.h" #include "arrow/visit_data_inline.h" #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/schema.h" #include "parquet/types.h" namespace bit_util = arrow::bit_util; using arrow::Status; using arrow::VisitNullBitmapInline; using arrow::internal::AddWithOverflow; using arrow::internal::checked_cast; using std::string_view; template <typename T> using ArrowPoolVector = std::vector<T, ::arrow::stl::allocator<T>>; namespace ch_parquet { using namespace parquet; namespace { constexpr int64_t kInMemoryDefaultCapacity = 1024; // The Parquet spec isn't very clear whether ByteArray lengths are signed or // unsigned, but the Java implementation uses signed ints. constexpr size_t kMaxByteArraySize = std::numeric_limits<int32_t>::max(); class EncoderImpl : virtual public Encoder { public: EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool) : descr_(descr), encoding_(encoding), pool_(pool), type_length_(descr ? descr->type_length() : -1) {} Encoding::type encoding() const override { return encoding_; } MemoryPool* memory_pool() const override { return pool_; } protected: // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY const ColumnDescriptor* descr_; const Encoding::type encoding_; MemoryPool* pool_; /// Type length from descr int type_length_; }; // ---------------------------------------------------------------------- // Plain encoder implementation template <typename DType> class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { public: using T = typename DType::c_type; explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool) : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {} int64_t EstimatedDataEncodedSize() override { return sink_.length(); } std::shared_ptr<Buffer> FlushValues() override { std::shared_ptr<Buffer> buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer)); return buffer; } using TypedEncoder<DType>::Put; void Put(const T* buffer, int num_values) override; void Put(const ::arrow::Array& values) override; void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = reinterpret_cast<T*>(buffer->mutable_data()); int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } void UnsafePutByteArray(const void* data, uint32_t length) { DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL"; sink_.UnsafeAppend(&length, sizeof(uint32_t)); sink_.UnsafeAppend(data, static_cast<int64_t>(length)); } void Put(const ByteArray& val) { // Write the result to the output stream const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t)); if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) { PARQUET_THROW_NOT_OK(sink_.Reserve(increment)); } UnsafePutByteArray(val.ptr, val.len); } protected: template <typename ArrayType> void PutBinaryArray(const ArrayType& array) { const int64_t total_bytes = array.value_offset(array.length()) - array.value_offset(0); PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t))); PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>( *array.data(), [&](::std::string_view view) { if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) { return Status::Invalid("Parquet cannot store strings with size 2GB or more"); } UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size())); return Status::OK(); }, []() { return Status::OK(); })); } ::arrow::BufferBuilder sink_; }; template <typename DType> void PlainEncoder<DType>::Put(const T* buffer, int num_values) { if (num_values > 0) { PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T))); } } template <> inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) { for (int i = 0; i < num_values; ++i) { Put(src[i]); } } template <typename ArrayType> void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) { if (values.type_id() != ArrayType::TypeClass::type_id) { std::string type_name = ArrayType::TypeClass::type_name(); throw ParquetException("direct put to " + type_name + " from " + values.type()->ToString() + " not supported"); } using value_type = typename ArrayType::value_type; constexpr auto value_size = sizeof(value_type); auto raw_values = checked_cast<const ArrayType&>(values).raw_values(); if (values.null_count() == 0) { // no nulls, just dump the data PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size)); } else { PARQUET_THROW_NOT_OK( sink->Reserve((values.length() - values.null_count()) * value_size)); for (int64_t i = 0; i < values.length(); i++) { if (values.IsValid(i)) { sink->UnsafeAppend(&raw_values[i], value_size); } } } } template <> void PlainEncoder<Int32Type>::Put(const ::arrow::Array& values) { DirectPutImpl<::arrow::Int32Array>(values, &sink_); } template <> void PlainEncoder<Int64Type>::Put(const ::arrow::Array& values) { DirectPutImpl<::arrow::Int64Array>(values, &sink_); } template <> void PlainEncoder<Int96Type>::Put(const ::arrow::Array& values) { ParquetException::NYI("direct put to Int96"); } template <> void PlainEncoder<FloatType>::Put(const ::arrow::Array& values) { DirectPutImpl<::arrow::FloatArray>(values, &sink_); } template <> void PlainEncoder<DoubleType>::Put(const ::arrow::Array& values) { DirectPutImpl<::arrow::DoubleArray>(values, &sink_); } template <typename DType> void PlainEncoder<DType>::Put(const ::arrow::Array& values) { ParquetException::NYI("direct put of " + values.type()->ToString()); } void AssertBaseBinary(const ::arrow::Array& values) { if (!::arrow::is_base_binary_like(values.type_id())) { throw ParquetException("Only BaseBinaryArray and subclasses supported"); } } template <> inline void PlainEncoder<ByteArrayType>::Put(const ::arrow::Array& values) { AssertBaseBinary(values); if (::arrow::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values)); } else { DCHECK(::arrow::is_large_binary_like(values.type_id())); PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values)); } } void AssertFixedSizeBinary(const ::arrow::Array& values, int type_length) { if (values.type_id() != ::arrow::Type::FIXED_SIZE_BINARY && values.type_id() != ::arrow::Type::DECIMAL) { throw ParquetException("Only FixedSizeBinaryArray and subclasses supported"); } if (checked_cast<const ::arrow::FixedSizeBinaryType&>(*values.type()).byte_width() != type_length) { throw ParquetException("Size mismatch: " + values.type()->ToString() + " should have been " + std::to_string(type_length) + " wide"); } } template <> inline void PlainEncoder<FLBAType>::Put(const ::arrow::Array& values) { AssertFixedSizeBinary(values, descr_->type_length()); const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values); if (data.null_count() == 0) { // no nulls, just dump the data PARQUET_THROW_NOT_OK( sink_.Append(data.raw_values(), data.length() * data.byte_width())); } else { const int64_t total_bytes = data.length() * data.byte_width() - data.null_count() * data.byte_width(); PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes)); for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { sink_.UnsafeAppend(data.Value(i), data.byte_width()); } } } } template <> inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) { if (descr_->type_length() == 0) { return; } for (int i = 0; i < num_values; ++i) { // Write the result to the output stream DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL"; PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length())); } } template <> class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEncoder { public: explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool) : EncoderImpl(descr, Encoding::PLAIN, pool), bits_available_(kInMemoryDefaultCapacity * 8), bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), sink_(pool), bit_writer_(bits_buffer_->mutable_data(), static_cast<int>(bits_buffer_->size())) {} int64_t EstimatedDataEncodedSize() override; std::shared_ptr<Buffer> FlushValues() override; void Put(const bool* src, int num_values) override; void Put(const std::vector<bool>& src, int num_values) override; void PutSpaced(const bool* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = reinterpret_cast<T*>(buffer->mutable_data()); int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } void Put(const ::arrow::Array& values) override { if (values.type_id() != ::arrow::Type::BOOL) { throw ParquetException("direct put to boolean from " + values.type()->ToString() + " not supported"); } const auto& data = checked_cast<const ::arrow::BooleanArray&>(values); if (data.null_count() == 0) { PARQUET_THROW_NOT_OK(sink_.Reserve(bit_util::BytesForBits(data.length()))); // no nulls, just dump the data ::arrow::internal::CopyBitmap(data.data()->GetValues<uint8_t>(1), data.offset(), data.length(), sink_.mutable_data(), sink_.length()); } else { auto n_valid = bit_util::BytesForBits(data.length() - data.null_count()); PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid)); ::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(), sink_.length(), n_valid); for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { if (data.Value(i)) { writer.Set(); } else { writer.Clear(); } writer.Next(); } } writer.Finish(); } sink_.UnsafeAdvance(data.length()); } private: int bits_available_; std::shared_ptr<ResizableBuffer> bits_buffer_; ::arrow::BufferBuilder sink_; ::arrow::bit_util::BitWriter bit_writer_; template <typename SequenceType> void PutImpl(const SequenceType& src, int num_values); }; template <typename SequenceType> void PlainEncoder<BooleanType>::PutImpl(const SequenceType& src, int num_values) { int bit_offset = 0; if (bits_available_ > 0) { int bits_to_write = std::min(bits_available_, num_values); for (int i = 0; i < bits_to_write; i++) { bit_writer_.PutValue(src[i], 1); } bits_available_ -= bits_to_write; bit_offset = bits_to_write; if (bits_available_ == 0) { bit_writer_.Flush(); PARQUET_THROW_NOT_OK( sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); } } int bits_remaining = num_values - bit_offset; while (bit_offset < num_values) { bits_available_ = static_cast<int>(bits_buffer_->size()) * 8; int bits_to_write = std::min(bits_available_, bits_remaining); for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { bit_writer_.PutValue(src[i], 1); } bit_offset += bits_to_write; bits_available_ -= bits_to_write; bits_remaining -= bits_to_write; if (bits_available_ == 0) { bit_writer_.Flush(); PARQUET_THROW_NOT_OK( sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); } } } int64_t PlainEncoder<BooleanType>::EstimatedDataEncodedSize() { int64_t position = sink_.length(); return position + bit_writer_.bytes_written(); } std::shared_ptr<Buffer> PlainEncoder<BooleanType>::FlushValues() { if (bits_available_ > 0) { bit_writer_.Flush(); PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); bits_available_ = static_cast<int>(bits_buffer_->size()) * 8; } std::shared_ptr<Buffer> buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer)); return buffer; } void PlainEncoder<BooleanType>::Put(const bool* src, int num_values) { PutImpl(src, num_values); } void PlainEncoder<BooleanType>::Put(const std::vector<bool>& src, int num_values) { PutImpl(src, num_values); } // ---------------------------------------------------------------------- // DictEncoder<T> implementations template <typename DType> struct DictEncoderTraits { using c_type = typename DType::c_type; using MemoTableType = ::arrow::internal::ScalarMemoTable<c_type>; }; template <> struct DictEncoderTraits<ByteArrayType> { using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>; }; template <> struct DictEncoderTraits<FLBAType> { using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>; }; // Initially 1024 elements static constexpr int32_t kInitialHashTableSize = 1 << 10; /// See the dictionary encoding section of /// https://github.com/Parquet/parquet-format. The encoding supports /// streaming encoding. Values are encoded as they are added while the /// dictionary is being constructed. At any time, the buffered values /// can be written out with the current dictionary size. More values /// can then be added to the encoder, including new dictionary /// entries. template <typename DType> class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> { using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType; public: typedef typename DType::c_type T; explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool) : EncoderImpl(desc, Encoding::PLAIN_DICTIONARY, pool), buffered_indices_(::arrow::stl::allocator<int32_t>(pool)), dict_encoded_size_(0), memo_table_(pool, kInitialHashTableSize) {} ~DictEncoderImpl() override { DCHECK(buffered_indices_.empty()); } int dict_encoded_size() override { return dict_encoded_size_; } int WriteIndices(uint8_t* buffer, int buffer_len) override { // Write bit width in first byte *buffer = static_cast<uint8_t>(bit_width()); ++buffer; --buffer_len; ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width()); for (int32_t index : buffered_indices_) { if (!encoder.Put(index)) return -1; } encoder.Flush(); ClearIndices(); return 1 + encoder.len(); } void set_type_length(int type_length) { this->type_length_ = type_length; } /// Returns a conservative estimate of the number of bytes needed to encode the buffered /// indices. Used to size the buffer passed to WriteIndices(). int64_t EstimatedDataEncodedSize() override { // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to // reserve // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used // but not reserving them would cause the encoder to fail. return 1 + ::arrow::util::RleEncoder::MaxBufferSize( bit_width(), static_cast<int>(buffered_indices_.size())) + ::arrow::util::RleEncoder::MinBufferSize(bit_width()); } /// The minimum bit width required to encode the currently buffered indices. int bit_width() const override { if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0; if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1; return bit_util::Log2(num_entries()); } /// Encode value. Note that this does not actually write any data, just /// buffers the value's index to be written later. inline void Put(const T& value); // Not implemented for other data types inline void PutByteArray(const void* ptr, int32_t length); void Put(const T* src, int num_values) override { for (int32_t i = 0; i < num_values; i++) { Put(src[i]); } } void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { ::arrow::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { for (int64_t i = 0; i < length; i++) { Put(src[i + position]); } }); } using TypedEncoder<DType>::Put; void Put(const ::arrow::Array& values) override; void PutDictionary(const ::arrow::Array& values) override; template <typename ArrowType, typename T = typename ArrowType::c_type> void PutIndicesTyped(const ::arrow::Array& data) { auto values = data.data()->GetValues<T>(1); size_t buffer_position = buffered_indices_.size(); buffered_indices_.resize(buffer_position + static_cast<size_t>(data.length() - data.null_count())); ::arrow::internal::VisitSetBitRunsVoid( data.null_bitmap_data(), data.offset(), data.length(), [&](int64_t position, int64_t length) { for (int64_t i = 0; i < length; ++i) { buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i + position]); } }); } void PutIndices(const ::arrow::Array& data) override { switch (data.type()->id()) { case ::arrow::Type::UINT8: case ::arrow::Type::INT8: return PutIndicesTyped<::arrow::UInt8Type>(data); case ::arrow::Type::UINT16: case ::arrow::Type::INT16: return PutIndicesTyped<::arrow::UInt16Type>(data); case ::arrow::Type::UINT32: case ::arrow::Type::INT32: return PutIndicesTyped<::arrow::UInt32Type>(data); case ::arrow::Type::UINT64: case ::arrow::Type::INT64: return PutIndicesTyped<::arrow::UInt64Type>(data); default: throw ParquetException("Passed non-integer array to PutIndices"); } } std::shared_ptr<Buffer> FlushValues() override { std::shared_ptr<ResizableBuffer> buffer = AllocateBuffer(this->pool_, EstimatedDataEncodedSize()); int result_size = WriteIndices(buffer->mutable_data(), static_cast<int>(EstimatedDataEncodedSize())); PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false)); return std::move(buffer); } /// Writes out the encoded dictionary to buffer. buffer must be preallocated to /// dict_encoded_size() bytes. void WriteDict(uint8_t* buffer) override; /// The number of entries in the dictionary. int num_entries() const override { return memo_table_.size(); } private: /// Clears all the indices (but leaves the dictionary). void ClearIndices() { buffered_indices_.clear(); } /// Indices that have not yet be written out by WriteIndices(). ArrowPoolVector<int32_t> buffered_indices_; template <typename ArrayType> void PutBinaryArray(const ArrayType& array) { PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>( *array.data(), [&](::std::string_view view) { if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) { return Status::Invalid("Parquet cannot store strings with size 2GB or more"); } PutByteArray(view.data(), static_cast<uint32_t>(view.size())); return Status::OK(); }, []() { return Status::OK(); })); } template <typename ArrayType> void PutBinaryDictionaryArray(const ArrayType& array) { DCHECK_EQ(array.null_count(), 0); for (int64_t i = 0; i < array.length(); i++) { auto v = array.GetView(i); if (ARROW_PREDICT_FALSE(v.size() > kMaxByteArraySize)) { throw ParquetException("Parquet cannot store strings with size 2GB or more"); } dict_encoded_size_ += static_cast<int>(v.size() + sizeof(uint32_t)); int32_t unused_memo_index; PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert( v.data(), static_cast<int32_t>(v.size()), &unused_memo_index)); } } /// The number of bytes needed to encode the dictionary. int dict_encoded_size_; MemoTableType memo_table_; }; template <typename DType> void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) { // For primitive types, only a memcpy DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size()); memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer)); } // ByteArray and FLBA already have the dictionary encoded in their data heaps template <> void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) { memo_table_.VisitValues(0, [&buffer](const ::std::string_view& v) { uint32_t len = static_cast<uint32_t>(v.length()); memcpy(buffer, &len, sizeof(len)); buffer += sizeof(len); memcpy(buffer, v.data(), len); buffer += len; }); } template <> void DictEncoderImpl<FLBAType>::WriteDict(uint8_t* buffer) { memo_table_.VisitValues(0, [&](const ::std::string_view& v) { DCHECK_EQ(v.length(), static_cast<size_t>(type_length_)); memcpy(buffer, v.data(), type_length_); buffer += type_length_; }); } template <typename DType> inline void DictEncoderImpl<DType>::Put(const T& v) { // Put() implementation for primitive types auto on_found = [](int32_t memo_index) {}; auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += static_cast<int>(sizeof(T)); }; int32_t memo_index; PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(v, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); } template <typename DType> inline void DictEncoderImpl<DType>::PutByteArray(const void* ptr, int32_t length) { DCHECK(false); } template <> inline void DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr, int32_t length) { static const uint8_t empty[] = {0}; auto on_found = [](int32_t memo_index) {}; auto on_not_found = [&](int32_t memo_index) { dict_encoded_size_ += static_cast<int>(length + sizeof(uint32_t)); }; DCHECK(ptr != nullptr || length == 0); ptr = (ptr != nullptr) ? ptr : empty; int32_t memo_index; PARQUET_THROW_NOT_OK( memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); } template <> inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& val) { return PutByteArray(val.ptr, static_cast<int32_t>(val.len)); } template <> inline void DictEncoderImpl<FLBAType>::Put(const FixedLenByteArray& v) { static const uint8_t empty[] = {0}; auto on_found = [](int32_t memo_index) {}; auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; }; DCHECK(v.ptr != nullptr || type_length_ == 0); const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; int32_t memo_index; PARQUET_THROW_NOT_OK( memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); } template <> void DictEncoderImpl<Int96Type>::Put(const ::arrow::Array& values) { ParquetException::NYI("Direct put to Int96"); } template <> void DictEncoderImpl<Int96Type>::PutDictionary(const ::arrow::Array& values) { ParquetException::NYI("Direct put to Int96"); } template <typename DType> void DictEncoderImpl<DType>::Put(const ::arrow::Array& values) { using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType; const auto& data = checked_cast<const ArrayType&>(values); if (data.null_count() == 0) { // no nulls, just dump the data for (int64_t i = 0; i < data.length(); i++) { Put(data.Value(i)); } } else { for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { Put(data.Value(i)); } } } } template <> void DictEncoderImpl<FLBAType>::Put(const ::arrow::Array& values) { AssertFixedSizeBinary(values, type_length_); const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values); if (data.null_count() == 0) { // no nulls, just dump the data for (int64_t i = 0; i < data.length(); i++) { Put(FixedLenByteArray(data.Value(i))); } } else { std::vector<uint8_t> empty(type_length_, 0); for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { Put(FixedLenByteArray(data.Value(i))); } } } } template <> void DictEncoderImpl<ByteArrayType>::Put(const ::arrow::Array& values) { AssertBaseBinary(values); if (::arrow::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values)); } else { DCHECK(::arrow::is_large_binary_like(values.type_id())); PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values)); } } template <typename DType> void AssertCanPutDictionary(DictEncoderImpl<DType>* encoder, const ::arrow::Array& dict) { if (dict.null_count() > 0) { throw ParquetException("Inserted dictionary cannot cannot contain nulls"); } if (encoder->num_entries() > 0) { throw ParquetException("Can only call PutDictionary on an empty DictEncoder"); } } template <typename DType> void DictEncoderImpl<DType>::PutDictionary(const ::arrow::Array& values) { AssertCanPutDictionary(this, values); using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType; const auto& data = checked_cast<const ArrayType&>(values); dict_encoded_size_ += static_cast<int>(sizeof(typename DType::c_type) * data.length()); for (int64_t i = 0; i < data.length(); i++) { int32_t unused_memo_index; PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(data.Value(i), &unused_memo_index)); } } template <> void DictEncoderImpl<FLBAType>::PutDictionary(const ::arrow::Array& values) { AssertFixedSizeBinary(values, type_length_); AssertCanPutDictionary(this, values); const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values); dict_encoded_size_ += static_cast<int>(type_length_ * data.length()); for (int64_t i = 0; i < data.length(); i++) { int32_t unused_memo_index; PARQUET_THROW_NOT_OK( memo_table_.GetOrInsert(data.Value(i), type_length_, &unused_memo_index)); } } template <> void DictEncoderImpl<ByteArrayType>::PutDictionary(const ::arrow::Array& values) { AssertBaseBinary(values); AssertCanPutDictionary(this, values); if (::arrow::is_binary_like(values.type_id())) { PutBinaryDictionaryArray(checked_cast<const ::arrow::BinaryArray&>(values)); } else { DCHECK(::arrow::is_large_binary_like(values.type_id())); PutBinaryDictionaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values)); } } // ---------------------------------------------------------------------- // ByteStreamSplitEncoder<T> implementations template <typename DType> class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { public: using T = typename DType::c_type; using TypedEncoder<DType>::Put; explicit ByteStreamSplitEncoder( const ColumnDescriptor* descr, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); int64_t EstimatedDataEncodedSize() override; std::shared_ptr<Buffer> FlushValues() override; void Put(const T* buffer, int num_values) override; void Put(const ::arrow::Array& values) override; void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override; protected: template <typename ArrowType> void PutImpl(const ::arrow::Array& values) { if (values.type_id() != ArrowType::type_id) { throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() + " from " + values.type()->ToString() + " not supported"); } const auto& data = *values.data(); PutSpaced(data.GetValues<typename ArrowType::c_type>(1), static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), data.offset); } ::arrow::BufferBuilder sink_; int64_t num_values_in_buffer_; }; template <typename DType> ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), sink_{pool}, num_values_in_buffer_{0} {} template <typename DType> int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() { return sink_.length(); } template <typename DType> std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() { std::shared_ptr<ResizableBuffer> output_buffer = AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); uint8_t* output_buffer_raw = output_buffer->mutable_data(); const uint8_t* raw_values = sink_.data(); ::arrow::util::internal::ByteStreamSplitEncode<T>(raw_values, num_values_in_buffer_, output_buffer_raw); sink_.Reset(); num_values_in_buffer_ = 0; return std::move(output_buffer); } template <typename DType> void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) { if (num_values > 0) { PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T))); num_values_in_buffer_ += num_values; } } template <> void ByteStreamSplitEncoder<FloatType>::Put(const ::arrow::Array& values) { PutImpl<::arrow::FloatType>(values); } template <> void ByteStreamSplitEncoder<DoubleType>::Put(const ::arrow::Array& values) { PutImpl<::arrow::DoubleType>(values); } template <typename DType> void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = reinterpret_cast<T*>(buffer->mutable_data()); int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } class DecoderImpl : virtual public Decoder { public: void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; data_ = data; len_ = len; } int values_left() const override { return num_values_; } Encoding::type encoding() const override { return encoding_; } protected: explicit DecoderImpl(const ColumnDescriptor* descr, Encoding::type encoding) : descr_(descr), encoding_(encoding), num_values_(0), data_(NULLPTR), len_(0) {} // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY const ColumnDescriptor* descr_; const Encoding::type encoding_; int num_values_; const uint8_t* data_; int len_; int type_length_; }; template <typename DType> class PlainDecoder : public DecoderImpl, virtual public TypedDecoder<DType> { public: using T = typename DType::c_type; explicit PlainDecoder(const ColumnDescriptor* descr); int Decode(T* buffer, int max_values) override; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::Accumulator* builder) override; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::DictAccumulator* builder) override; }; template <> inline int PlainDecoder<Int96Type>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<Int96Type>::Accumulator* builder) { ParquetException::NYI("DecodeArrow not supported for Int96"); } template <> inline int PlainDecoder<Int96Type>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<Int96Type>::DictAccumulator* builder) { ParquetException::NYI("DecodeArrow not supported for Int96"); } template <> inline int PlainDecoder<BooleanType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<BooleanType>::DictAccumulator* builder) { ParquetException::NYI("dictionaries of BooleanType"); } template <typename DType> int PlainDecoder<DType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::Accumulator* builder) { using value_type = typename DType::c_type; constexpr int value_size = static_cast<int>(sizeof(value_type)); int values_decoded = num_values - null_count; if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) { ParquetException::EofException(); } PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { builder->UnsafeAppend(::arrow::util::SafeLoadAs<value_type>(data_)); data_ += sizeof(value_type); }, [&]() { builder->UnsafeAppendNull(); }); num_values_ -= values_decoded; len_ -= sizeof(value_type) * values_decoded; return values_decoded; } template <typename DType> int PlainDecoder<DType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::DictAccumulator* builder) { using value_type = typename DType::c_type; constexpr int value_size = static_cast<int>(sizeof(value_type)); int values_decoded = num_values - null_count; if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) { ParquetException::EofException(); } PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { PARQUET_THROW_NOT_OK( builder->Append(::arrow::util::SafeLoadAs<value_type>(data_))); data_ += sizeof(value_type); }, [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); }); num_values_ -= values_decoded; len_ -= sizeof(value_type) * values_decoded; return values_decoded; } // Decode routine templated on C++ type rather than type enum template <typename T> inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, int type_length, T* out) { int64_t bytes_to_decode = num_values * static_cast<int64_t>(sizeof(T)); if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) { ParquetException::EofException(); } // If bytes_to_decode == 0, data could be null if (bytes_to_decode > 0) { memcpy(out, data, bytes_to_decode); } return static_cast<int>(bytes_to_decode); } template <typename DType> PlainDecoder<DType>::PlainDecoder(const ColumnDescriptor* descr) : DecoderImpl(descr, Encoding::PLAIN) { if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { type_length_ = descr_->type_length(); } else { type_length_ = -1; } } // Template specialization for BYTE_ARRAY. The written values do not own their // own data. static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size, ByteArray* out) { if (ARROW_PREDICT_FALSE(data_size < 4)) { ParquetException::EofException(); } const int32_t len = ::arrow::util::SafeLoadAs<int32_t>(data); if (len < 0) { throw ParquetException("Invalid BYTE_ARRAY value"); } const int64_t consumed_length = static_cast<int64_t>(len) + 4; if (ARROW_PREDICT_FALSE(data_size < consumed_length)) { ParquetException::EofException(); } *out = ByteArray{static_cast<uint32_t>(len), data + 4}; return consumed_length; } template <> inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values, int type_length, ByteArray* out) { int bytes_decoded = 0; for (int i = 0; i < num_values; ++i) { const auto increment = ReadByteArray(data, data_size, out + i); if (ARROW_PREDICT_FALSE(increment > INT_MAX - bytes_decoded)) { throw ParquetException("BYTE_ARRAY chunk too large"); } data += increment; data_size -= increment; bytes_decoded += static_cast<int>(increment); } return bytes_decoded; } // Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not // own their own data. template <> inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size, int num_values, int type_length, FixedLenByteArray* out) { int64_t bytes_to_decode = static_cast<int64_t>(type_length) * num_values; if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) { ParquetException::EofException(); } for (int i = 0; i < num_values; ++i) { out[i].ptr = data; data += type_length; data_size -= type_length; } return static_cast<int>(bytes_to_decode); } template <typename DType> int PlainDecoder<DType>::Decode(T* buffer, int max_values) { max_values = std::min(max_values, num_values_); int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer); data_ += bytes_consumed; len_ -= bytes_consumed; num_values_ -= max_values; return max_values; } class PlainBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { public: explicit PlainBooleanDecoder(const ColumnDescriptor* descr); void SetData(int num_values, const uint8_t* data, int len) override; // Two flavors of bool decoding int Decode(uint8_t* buffer, int max_values) override; int Decode(bool* buffer, int max_values) override; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<BooleanType>::Accumulator* out) override; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<BooleanType>::DictAccumulator* out) override; private: std::unique_ptr<::arrow::bit_util::BitReader> bit_reader_; }; PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr) : DecoderImpl(descr, Encoding::PLAIN) {} void PlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; bit_reader_ = std::make_unique<bit_util::BitReader>(data, len); } int PlainBooleanDecoder::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<BooleanType>::Accumulator* builder) { int values_decoded = num_values - null_count; if (ARROW_PREDICT_FALSE(num_values_ < values_decoded)) { ParquetException::EofException(); } PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { bool value; ARROW_IGNORE_EXPR(bit_reader_->GetValue(1, &value)); builder->UnsafeAppend(value); }, [&]() { builder->UnsafeAppendNull(); }); num_values_ -= values_decoded; return values_decoded; } inline int PlainBooleanDecoder::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<BooleanType>::DictAccumulator* builder) { ParquetException::NYI("dictionaries of BooleanType"); } int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) { max_values = std::min(max_values, num_values_); bool val; ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values); for (int i = 0; i < max_values; ++i) { if (!bit_reader_->GetValue(1, &val)) { ParquetException::EofException(); } if (val) { bit_writer.Set(); } bit_writer.Next(); } bit_writer.Finish(); num_values_ -= max_values; return max_values; } int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { max_values = std::min(max_values, num_values_); if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) { ParquetException::EofException(); } num_values_ -= max_values; return max_values; } struct ArrowBinaryHelper { explicit ArrowBinaryHelper(typename EncodingTraits<ByteArrayType>::Accumulator* out) { this->out = out; this->builder = out->builder.get(); this->chunk_space_remaining = ::arrow::kBinaryMemoryLimit - this->builder->value_data_length(); } Status PushChunk() { std::shared_ptr<::arrow::Array> result; RETURN_NOT_OK(builder->Finish(&result)); out->chunks.push_back(result); chunk_space_remaining = ::arrow::kBinaryMemoryLimit; return Status::OK(); } bool CanFit(int64_t length) const { return length <= chunk_space_remaining; } void UnsafeAppend(const uint8_t* data, int32_t length) { chunk_space_remaining -= length; builder->UnsafeAppend(data, length); } void UnsafeAppendNull() { builder->UnsafeAppendNull(); } Status Append(const uint8_t* data, int32_t length) { chunk_space_remaining -= length; return builder->Append(data, length); } Status AppendNull() { return builder->AppendNull(); } typename EncodingTraits<ByteArrayType>::Accumulator* out; ::arrow::BinaryBuilder* builder; int64_t chunk_space_remaining; }; template <> inline int PlainDecoder<ByteArrayType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* builder) { ParquetException::NYI(); } template <> inline int PlainDecoder<ByteArrayType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) { ParquetException::NYI(); } template <> inline int PlainDecoder<FLBAType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<FLBAType>::Accumulator* builder) { int values_decoded = num_values - null_count; if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) { ParquetException::EofException(); } PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { builder->UnsafeAppend(data_); data_ += descr_->type_length(); }, [&]() { builder->UnsafeAppendNull(); }); num_values_ -= values_decoded; len_ -= descr_->type_length() * values_decoded; return values_decoded; } template <> inline int PlainDecoder<FLBAType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<FLBAType>::DictAccumulator* builder) { int values_decoded = num_values - null_count; if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) { ParquetException::EofException(); } PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { PARQUET_THROW_NOT_OK(builder->Append(data_)); data_ += descr_->type_length(); }, [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); }); num_values_ -= values_decoded; len_ -= descr_->type_length() * values_decoded; return values_decoded; } class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>, virtual public ByteArrayDecoder { public: using Base = PlainDecoder<ByteArrayType>; using Base::DecodeSpaced; using Base::PlainDecoder; // ---------------------------------------------------------------------- // Dictionary read paths int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, ::arrow::BinaryDictionary32Builder* builder) override { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, builder, &result)); return result; } // ---------------------------------------------------------------------- // Optimized dense binary read paths int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out) override { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); return result; } int DecodeCH(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, PaddedPODArray<UInt8>* column_chars_t_p, PaddedPODArray<UInt64>* column_offsets_p) { int result = 0; PARQUET_THROW_NOT_OK(DecodeCHDense(num_values, null_count, valid_bits, valid_bits_offset, column_chars_t_p, column_offsets_p, &result)); return result; } private: Status DecodeCHDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, PaddedPODArray<UInt8>* column_chars_t_p, PaddedPODArray<UInt64>* column_offsets_p, int* out_values_decoded) { //ArrowBinaryHelper helper(out); int values_decoded = 0; // RETURN_NOT_OK(helper.builder->Reserve(num_values)); // RETURN_NOT_OK(helper.builder->ReserveData( // std::min<int64_t>(len_, helper.chunk_space_remaining))); column_offsets_p->reserve(num_values); column_chars_t_p->reserve(num_values + len_); if (null_count == 0) { for (int i = 0 ; i < num_values; i++) { if (ARROW_PREDICT_FALSE(len_ < 4)) { ParquetException::EofException(); } auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_); if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); } auto increment = value_len + 4; if (ARROW_PREDICT_FALSE(len_ < increment)) { ParquetException::EofException(); } column_chars_t_p->insert_assume_reserved(data_ + 4, data_ + 4 + value_len); column_chars_t_p->emplace_back('\0'); column_offsets_p->emplace_back(column_chars_t_p->size()); data_ += increment; len_ -= increment; ++values_decoded; } } else { RETURN_NOT_OK(VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { if (ARROW_PREDICT_FALSE(len_ < 4)) { ParquetException::EofException(); } auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_); if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); } auto increment = value_len + 4; if (ARROW_PREDICT_FALSE(len_ < increment)) { ParquetException::EofException(); } column_chars_t_p->insert_assume_reserved(data_ + 4, data_ + 4 + value_len); column_chars_t_p->emplace_back('\0'); column_offsets_p->emplace_back(column_chars_t_p->size()); data_ += increment; len_ -= increment; ++values_decoded; return Status::OK(); }, [&]() { //helper.UnsafeAppendNull(); column_chars_t_p->emplace_back('\0'); column_offsets_p->emplace_back(column_chars_t_p->size()); return Status::OK(); })); } num_values_ -= values_decoded; *out_values_decoded = values_decoded; return Status::OK(); } Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out, int* out_values_decoded) { ArrowBinaryHelper helper(out); int values_decoded = 0; RETURN_NOT_OK(helper.builder->Reserve(num_values)); RETURN_NOT_OK(helper.builder->ReserveData( std::min<int64_t>(len_, helper.chunk_space_remaining))); int i = 0; RETURN_NOT_OK(VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { if (ARROW_PREDICT_FALSE(len_ < 4)) { ParquetException::EofException(); } auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_); if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); } auto increment = value_len + 4; if (ARROW_PREDICT_FALSE(len_ < increment)) { ParquetException::EofException(); } if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { // This element would exceed the capacity of a chunk RETURN_NOT_OK(helper.PushChunk()); RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); RETURN_NOT_OK(helper.builder->ReserveData( std::min<int64_t>(len_, helper.chunk_space_remaining))); } helper.UnsafeAppend(data_ + 4, value_len); data_ += increment; len_ -= increment; ++values_decoded; ++i; return Status::OK(); }, [&]() { helper.UnsafeAppendNull(); ++i; return Status::OK(); })); num_values_ -= values_decoded; *out_values_decoded = values_decoded; return Status::OK(); } template <typename BuilderType> Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, int* out_values_decoded) { RETURN_NOT_OK(builder->Reserve(num_values)); int values_decoded = 0; RETURN_NOT_OK(VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { if (ARROW_PREDICT_FALSE(len_ < 4)) { ParquetException::EofException(); } auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_); if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); } auto increment = value_len + 4; if (ARROW_PREDICT_FALSE(len_ < increment)) { ParquetException::EofException(); } RETURN_NOT_OK(builder->Append(data_ + 4, value_len)); data_ += increment; len_ -= increment; ++values_decoded; return Status::OK(); }, [&]() { return builder->AppendNull(); })); num_values_ -= values_decoded; *out_values_decoded = values_decoded; return Status::OK(); } }; class PlainFLBADecoder : public PlainDecoder<FLBAType>, virtual public FLBADecoder { public: using Base = PlainDecoder<FLBAType>; using Base::PlainDecoder; }; // ---------------------------------------------------------------------- // Dictionary encoding and decoding template <typename Type> class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> { public: typedef typename Type::c_type T; // Initializes the dictionary with values from 'dictionary'. The data in // dictionary is not guaranteed to persist in memory after this call so the // dictionary decoder needs to copy the data out if necessary. explicit DictDecoderImpl(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::RLE_DICTIONARY), dictionary_(AllocateBuffer(pool, 0)), dictionary_length_(0), byte_array_data_(AllocateBuffer(pool, 0)), byte_array_offsets_(AllocateBuffer(pool, 0)), indices_scratch_space_(AllocateBuffer(pool, 0)) {} // Perform type-specific initiatialization void SetDict(TypedDecoder<Type>* dictionary) override; void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; if (len == 0) { // Initialize dummy decoder to avoid crashes later on idx_decoder_ = ::arrow::util::RleDecoder(data, len, /*bit_width=*/1); return; } uint8_t bit_width = *data; if (ARROW_PREDICT_FALSE(bit_width > 32)) { throw ParquetException("Invalid or corrupted bit_width " + std::to_string(bit_width) + ". Maximum allowed is 32."); } idx_decoder_ = ::arrow::util::RleDecoder(++data, --len, bit_width); } int Decode(T* buffer, int num_values) override { num_values = std::min(num_values, num_values_); int decoded_values = idx_decoder_.GetBatchWithDict(reinterpret_cast<const T*>(dictionary_->data()), dictionary_length_, buffer, num_values); if (decoded_values != num_values) { ParquetException::EofException(); } num_values_ -= num_values; return num_values; } int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset) override { num_values = std::min(num_values, num_values_); if (num_values != idx_decoder_.GetBatchWithDictSpaced( reinterpret_cast<const T*>(dictionary_->data()), dictionary_length_, buffer, num_values, null_count, valid_bits, valid_bits_offset)) { ParquetException::EofException(); } num_values_ -= num_values; return num_values; } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<Type>::Accumulator* out) override; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<Type>::DictAccumulator* out) override; void InsertDictionary(::arrow::ArrayBuilder* builder) override; int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, ::arrow::ArrayBuilder* builder) override { if (num_values > 0) { // TODO(wesm): Refactor to batch reads for improved memory use. It is not // trivial because the null_count is relative to the entire bitmap PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>( num_values, /*shrink_to_fit=*/false)); } auto indices_buffer = reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data()); if (num_values != idx_decoder_.GetBatchSpaced(num_values, null_count, valid_bits, valid_bits_offset, indices_buffer)) { ParquetException::EofException(); } // XXX(wesm): Cannot append "valid bits" directly to the builder std::vector<uint8_t> valid_bytes(num_values, 0); int64_t i = 0; VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { valid_bytes[i++] = 1; }, [&]() { ++i; }); auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder); PARQUET_THROW_NOT_OK( binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data())); num_values_ -= num_values - null_count; return num_values - null_count; } int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) override { num_values = std::min(num_values, num_values_); if (num_values > 0) { // TODO(wesm): Refactor to batch reads for improved memory use. This is // relatively simple here because we don't have to do any bookkeeping of // nulls PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>( num_values, /*shrink_to_fit=*/false)); } auto indices_buffer = reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data()); if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) { ParquetException::EofException(); } auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder); PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values)); num_values_ -= num_values; return num_values; } int DecodeIndices(int num_values, int32_t* indices) override { if (num_values != idx_decoder_.GetBatch(indices, num_values)) { ParquetException::EofException(); } num_values_ -= num_values; return num_values; } void GetDictionary(const T** dictionary, int32_t* dictionary_length) override { *dictionary_length = dictionary_length_; *dictionary = reinterpret_cast<T*>(dictionary_->mutable_data()); } protected: Status IndexInBounds(int32_t index) { if (ARROW_PREDICT_TRUE(0 <= index && index < dictionary_length_)) { return Status::OK(); } return Status::Invalid("Index not in dictionary bounds"); } inline void DecodeDict(TypedDecoder<Type>* dictionary) { dictionary_length_ = static_cast<int32_t>(dictionary->values_left()); PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T), /*shrink_to_fit=*/false)); dictionary->Decode(reinterpret_cast<T*>(dictionary_->mutable_data()), dictionary_length_); } // Only one is set. std::shared_ptr<ResizableBuffer> dictionary_; int32_t dictionary_length_; // Data that contains the byte array data (byte_array_dictionary_ just has the // pointers). std::shared_ptr<ResizableBuffer> byte_array_data_; // Arrow-style byte offsets for each dictionary value. We maintain two // representations of the dictionary, one as ByteArray* for non-Arrow // consumers and this one for Arrow consumers. Since dictionaries are // generally pretty small to begin with this doesn't mean too much extra // memory use in most cases std::shared_ptr<ResizableBuffer> byte_array_offsets_; // Reusable buffer for decoding dictionary indices to be appended to a // BinaryDictionary32Builder std::shared_ptr<ResizableBuffer> indices_scratch_space_; ::arrow::util::RleDecoder idx_decoder_; }; template <typename Type> void DictDecoderImpl<Type>::SetDict(TypedDecoder<Type>* dictionary) { DecodeDict(dictionary); } template <> void DictDecoderImpl<BooleanType>::SetDict(TypedDecoder<BooleanType>* dictionary) { ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); } template <> void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictionary) { DecodeDict(dictionary); auto dict_values = reinterpret_cast<ByteArray*>(dictionary_->mutable_data()); int total_size = 0; for (int i = 0; i < dictionary_length_; ++i) { total_size += dict_values[i].len; } PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, /*shrink_to_fit=*/false)); PARQUET_THROW_NOT_OK( byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t), /*shrink_to_fit=*/false)); int32_t offset = 0; uint8_t* bytes_data = byte_array_data_->mutable_data(); int32_t* bytes_offsets = reinterpret_cast<int32_t*>(byte_array_offsets_->mutable_data()); for (int i = 0; i < dictionary_length_; ++i) { memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len); bytes_offsets[i] = offset; dict_values[i].ptr = bytes_data + offset; offset += dict_values[i].len; } bytes_offsets[dictionary_length_] = offset; } template <> inline void DictDecoderImpl<FLBAType>::SetDict(TypedDecoder<FLBAType>* dictionary) { DecodeDict(dictionary); auto dict_values = reinterpret_cast<FLBA*>(dictionary_->mutable_data()); int fixed_len = descr_->type_length(); int total_size = dictionary_length_ * fixed_len; PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, /*shrink_to_fit=*/false)); uint8_t* bytes_data = byte_array_data_->mutable_data(); for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) { memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len); dict_values[i].ptr = bytes_data + offset; } } template <> inline int DictDecoderImpl<Int96Type>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<Int96Type>::Accumulator* builder) { ParquetException::NYI("DecodeArrow to Int96Type"); } template <> inline int DictDecoderImpl<Int96Type>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<Int96Type>::DictAccumulator* builder) { ParquetException::NYI("DecodeArrow to Int96Type"); } template <> inline int DictDecoderImpl<ByteArrayType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* builder) { ParquetException::NYI("DecodeArrow implemented elsewhere"); } template <> inline int DictDecoderImpl<ByteArrayType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) { ParquetException::NYI("DecodeArrow implemented elsewhere"); } template <typename DType> int DictDecoderImpl<DType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::DictAccumulator* builder) { PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); auto dict_values = reinterpret_cast<const typename DType::c_type*>(dictionary_->data()); VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { int32_t index; if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) { throw ParquetException(""); } PARQUET_THROW_NOT_OK(IndexInBounds(index)); PARQUET_THROW_NOT_OK(builder->Append(dict_values[index])); }, [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); }); return num_values - null_count; } template <> int DictDecoderImpl<BooleanType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<BooleanType>::DictAccumulator* builder) { ParquetException::NYI("No dictionary encoding for BooleanType"); } template <> inline int DictDecoderImpl<FLBAType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<FLBAType>::Accumulator* builder) { if (builder->byte_width() != descr_->type_length()) { throw ParquetException("Byte width mismatch: builder was " + std::to_string(builder->byte_width()) + " but decoder was " + std::to_string(descr_->type_length())); } PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data()); VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { int32_t index; if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) { throw ParquetException(""); } PARQUET_THROW_NOT_OK(IndexInBounds(index)); builder->UnsafeAppend(dict_values[index].ptr); }, [&]() { builder->UnsafeAppendNull(); }); return num_values - null_count; } template <> int DictDecoderImpl<FLBAType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<FLBAType>::DictAccumulator* builder) { auto value_type = checked_cast<const ::arrow::DictionaryType&>(*builder->type()).value_type(); auto byte_width = checked_cast<const ::arrow::FixedSizeBinaryType&>(*value_type).byte_width(); if (byte_width != descr_->type_length()) { throw ParquetException("Byte width mismatch: builder was " + std::to_string(byte_width) + " but decoder was " + std::to_string(descr_->type_length())); } PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data()); VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { int32_t index; if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) { throw ParquetException(""); } PARQUET_THROW_NOT_OK(IndexInBounds(index)); PARQUET_THROW_NOT_OK(builder->Append(dict_values[index].ptr)); }, [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); }); return num_values - null_count; } template <typename Type> int DictDecoderImpl<Type>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<Type>::Accumulator* builder) { PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); using value_type = typename Type::c_type; auto dict_values = reinterpret_cast<const value_type*>(dictionary_->data()); VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { int32_t index; if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) { throw ParquetException(""); } PARQUET_THROW_NOT_OK(IndexInBounds(index)); builder->UnsafeAppend(dict_values[index]); }, [&]() { builder->UnsafeAppendNull(); }); return num_values - null_count; } template <typename Type> void DictDecoderImpl<Type>::InsertDictionary(::arrow::ArrayBuilder* builder) { ParquetException::NYI("InsertDictionary only implemented for BYTE_ARRAY types"); } template <> void DictDecoderImpl<ByteArrayType>::InsertDictionary(::arrow::ArrayBuilder* builder) { auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder); // Make a BinaryArray referencing the internal dictionary data auto arr = std::make_shared<::arrow::BinaryArray>( dictionary_length_, byte_array_offsets_, byte_array_data_); PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr)); } class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>, virtual public ByteArrayDecoder { public: using BASE = DictDecoderImpl<ByteArrayType>; using BASE::DictDecoderImpl; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, ::arrow::BinaryDictionary32Builder* builder) override { int result = 0; if (null_count == 0) { PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result)); } else { PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, builder, &result)); } return result; } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out) override { int result = 0; if (null_count == 0) { PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result)); } else { PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); } return result; } int DecodeCH(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, PaddedPODArray<UInt8>* column_chars_t_p, PaddedPODArray<UInt64>* column_offsets_p) override { int result = 0; if (null_count == 0) { PARQUET_THROW_NOT_OK(DecodeCHDenseNonNull(num_values, column_chars_t_p, column_offsets_p, &result)); } else { PARQUET_THROW_NOT_OK(DecodeCHDense(num_values, null_count, valid_bits, valid_bits_offset, column_chars_t_p, column_offsets_p, &result)); } return result; } private: Status DecodeCHDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, PaddedPODArray<UInt8>* column_chars_t_p, PaddedPODArray<UInt64>* column_offsets_p, int* out_num_values) { constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; column_offsets_p->reserve(num_values); column_chars_t_p->reserve(num_values * 20); // approx ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data()); int values_decoded = 0; int num_appended = 0; while (num_appended < num_values) { bool is_valid = bit_reader.IsSet(); bit_reader.Next(); if (is_valid) { int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - num_appended - null_count); int num_indices = idx_decoder_.GetBatch(indices, batch_size); if (ARROW_PREDICT_FALSE(num_indices < 1)) { return Status::Invalid("Invalid number of indices '", num_indices, "'"); } int i = 0; while (true) { // Consume all indices if (is_valid) { auto idx = indices[i]; RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; column_chars_t_p -> insert(val.ptr, val.ptr + static_cast<int32_t>(val.len)); column_chars_t_p -> emplace_back('\0'); column_offsets_p -> emplace_back(column_chars_t_p -> size()); ++i; ++values_decoded; } else { column_chars_t_p -> emplace_back('\0'); column_offsets_p -> emplace_back(column_chars_t_p -> size()); --null_count; } ++num_appended; if (i == num_indices) { // Do not advance the bit_reader if we have fulfilled the decode // request break; } is_valid = bit_reader.IsSet(); bit_reader.Next(); } } else { column_chars_t_p -> emplace_back('\0'); column_offsets_p -> emplace_back(column_chars_t_p -> size()); --null_count; ++num_appended; } } *out_num_values = values_decoded; return Status::OK(); } Status DecodeCHDenseNonNull(int num_values, PaddedPODArray<UInt8>* column_chars_t_p, PaddedPODArray<UInt64>* column_offsets_p, int* out_num_values) { constexpr int32_t kBufferSize = 2048; int32_t indices[kBufferSize]; int values_decoded = 0; auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data()); while (values_decoded < num_values) { int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded); int num_indices = idx_decoder_.GetBatch(indices, batch_size); if (num_indices == 0) ParquetException::EofException(); for (int i = 0; i < num_indices; ++i) { auto idx = indices[i]; RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; column_chars_t_p -> insert(val.ptr, val.ptr + static_cast<int32_t>(val.len)); column_chars_t_p -> emplace_back('\0'); column_offsets_p -> emplace_back(column_chars_t_p -> size()); } values_decoded += num_indices; } *out_num_values = values_decoded; return Status::OK(); } Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out, int* out_num_values) { constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; ArrowBinaryHelper helper(out); auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data()); int values_decoded = 0; int num_indices = 0; int pos_indices = 0; auto visit_valid = [&](int64_t position) -> Status { if (num_indices == pos_indices) { // Refill indices buffer const auto batch_size = std::min<int32_t>(kBufferSize, num_values - null_count - values_decoded); num_indices = idx_decoder_.GetBatch(indices, batch_size); if (ARROW_PREDICT_FALSE(num_indices < 1)) { return Status::Invalid("Invalid number of indices: ", num_indices); } pos_indices = 0; } const auto index = indices[pos_indices++]; RETURN_NOT_OK(IndexInBounds(index)); const auto& val = dict_values[index]; if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { RETURN_NOT_OK(helper.PushChunk()); } RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len))); ++values_decoded; return Status::OK(); }; auto visit_null = [&]() -> Status { RETURN_NOT_OK(helper.AppendNull()); return Status::OK(); }; ::arrow::internal::BitBlockCounter bit_blocks(valid_bits, valid_bits_offset, num_values); int64_t position = 0; while (position < num_values) { const auto block = bit_blocks.NextWord(); if (block.AllSet()) { for (int64_t i = 0; i < block.length; ++i, ++position) { ARROW_RETURN_NOT_OK(visit_valid(position)); } } else if (block.NoneSet()) { for (int64_t i = 0; i < block.length; ++i, ++position) { ARROW_RETURN_NOT_OK(visit_null()); } } else { for (int64_t i = 0; i < block.length; ++i, ++position) { if (bit_util::GetBit(valid_bits, valid_bits_offset + position)) { ARROW_RETURN_NOT_OK(visit_valid(position)); } else { ARROW_RETURN_NOT_OK(visit_null()); } } } } *out_num_values = values_decoded; return Status::OK(); } Status DecodeArrowDenseNonNull(int num_values, typename EncodingTraits<ByteArrayType>::Accumulator* out, int* out_num_values) { constexpr int32_t kBufferSize = 2048; int32_t indices[kBufferSize]; int values_decoded = 0; ArrowBinaryHelper helper(out); auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data()); while (values_decoded < num_values) { int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded); int num_indices = idx_decoder_.GetBatch(indices, batch_size); if (num_indices == 0) ParquetException::EofException(); for (int i = 0; i < num_indices; ++i) { auto idx = indices[i]; RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { RETURN_NOT_OK(helper.PushChunk()); } RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len))); } values_decoded += num_indices; } *out_num_values = values_decoded; return Status::OK(); } template <typename BuilderType> Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, int* out_num_values) { constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; RETURN_NOT_OK(builder->Reserve(num_values)); ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data()); int values_decoded = 0; int num_appended = 0; while (num_appended < num_values) { bool is_valid = bit_reader.IsSet(); bit_reader.Next(); if (is_valid) { int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - num_appended - null_count); int num_indices = idx_decoder_.GetBatch(indices, batch_size); int i = 0; while (true) { // Consume all indices if (is_valid) { auto idx = indices[i]; RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; RETURN_NOT_OK(builder->Append(val.ptr, val.len)); ++i; ++values_decoded; } else { RETURN_NOT_OK(builder->AppendNull()); --null_count; } ++num_appended; if (i == num_indices) { // Do not advance the bit_reader if we have fulfilled the decode // request break; } is_valid = bit_reader.IsSet(); bit_reader.Next(); } } else { RETURN_NOT_OK(builder->AppendNull()); --null_count; ++num_appended; } } *out_num_values = values_decoded; return Status::OK(); } template <typename BuilderType> Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* out_num_values) { constexpr int32_t kBufferSize = 2048; int32_t indices[kBufferSize]; RETURN_NOT_OK(builder->Reserve(num_values)); auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data()); int values_decoded = 0; while (values_decoded < num_values) { int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded); int num_indices = idx_decoder_.GetBatch(indices, batch_size); if (num_indices == 0) ParquetException::EofException(); for (int i = 0; i < num_indices; ++i) { auto idx = indices[i]; RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; RETURN_NOT_OK(builder->Append(val.ptr, val.len)); } values_decoded += num_indices; } *out_num_values = values_decoded; return Status::OK(); } }; // ---------------------------------------------------------------------- // DeltaBitPackEncoder /// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format /// as per the parquet spec. See: /// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5 /// /// Consists of a header followed by blocks of delta encoded values binary packed. /// /// Format /// [header] [block 1] [block 2] ... [block N] /// /// Header /// [block size] [number of mini blocks per block] [total value count] [first value] /// /// Block /// [min delta] [list of bitwidths of the mini blocks] [miniblocks] /// /// Sets aside bytes at the start of the internal buffer where the header will be written, /// and only writes the header when FlushValues is called before returning it. /// /// To encode a block, we will: /// /// 1. Compute the differences between consecutive elements. For the first element in the /// block, use the last element in the previous block or, in the case of the first block, /// use the first value of the whole sequence, stored in the header. /// /// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract /// this min delta from all deltas in the block. This guarantees that all values are /// non-negative. /// /// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the /// bit widths of the mini blocks and the delta values (minus the min delta) bit packed /// per mini block. /// /// Supports only INT32 and INT64. template <typename DType> class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { // Maximum possible header size static constexpr uint32_t kMaxPageHeaderWriterSize = 32; static constexpr uint32_t kValuesPerBlock = 128; static constexpr uint32_t kMiniBlocksPerBlock = 4; public: using T = typename DType::c_type; using UT = std::make_unsigned_t<T>; using TypedEncoder<DType>::Put; explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool, const uint32_t values_per_block = kValuesPerBlock, const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock) : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool), values_per_block_(values_per_block), mini_blocks_per_block_(mini_blocks_per_block), values_per_mini_block_(values_per_block / mini_blocks_per_block), deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)), bits_buffer_( AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * sizeof(T))), sink_(pool), bit_writer_(bits_buffer_->mutable_data(), static_cast<int>(bits_buffer_->size())) { if (values_per_block_ % 128 != 0) { throw ParquetException( "the number of values in a block must be multiple of 128, but it's " + std::to_string(values_per_block_)); } if (values_per_mini_block_ % 32 != 0) { throw ParquetException( "the number of values in a miniblock must be multiple of 32, but it's " + std::to_string(values_per_mini_block_)); } if (values_per_block % mini_blocks_per_block != 0) { throw ParquetException( "the number of values per block % number of miniblocks per block must be 0, " "but it's " + std::to_string(values_per_block % mini_blocks_per_block)); } // Reserve enough space at the beginning of the buffer for largest possible header. PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize)); } std::shared_ptr<Buffer> FlushValues() override; int64_t EstimatedDataEncodedSize() override { return sink_.length(); } void Put(const ::arrow::Array& values) override; void Put(const T* buffer, int num_values) override; void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override; void FlushBlock(); private: const uint32_t values_per_block_; const uint32_t mini_blocks_per_block_; const uint32_t values_per_mini_block_; uint32_t values_current_block_{0}; uint32_t total_value_count_{0}; UT first_value_{0}; UT current_value_{0}; ArrowPoolVector<UT> deltas_; std::shared_ptr<ResizableBuffer> bits_buffer_; ::arrow::BufferBuilder sink_; ::arrow::bit_util::BitWriter bit_writer_; }; template <typename DType> void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) { if (num_values == 0) { return; } int idx = 0; if (total_value_count_ == 0) { current_value_ = src[0]; first_value_ = current_value_; idx = 1; } total_value_count_ += num_values; while (idx < num_values) { UT value = static_cast<UT>(src[idx]); // Calculate deltas. The possible overflow is handled by use of unsigned integers // making subtraction operations well-defined and correct even in case of overflow. // Encoded integers will wrap back around on decoding. // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n deltas_[values_current_block_] = value - current_value_; current_value_ = value; idx++; values_current_block_++; if (values_current_block_ == values_per_block_) { FlushBlock(); } } } template <typename DType> void DeltaBitPackEncoder<DType>::FlushBlock() { if (values_current_block_ == 0) { return; } const UT min_delta = *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta)); // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write // bit widths of miniblocks as they become known during the encoding. uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); DCHECK(bit_width_data != nullptr); const uint32_t num_miniblocks = static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) / static_cast<double>(values_per_mini_block_))); for (uint32_t i = 0; i < num_miniblocks; i++) { const uint32_t values_current_mini_block = std::min(values_per_mini_block_, values_current_block_); const uint32_t start = i * values_per_mini_block_; const UT max_delta = *std::max_element( deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block); // The minimum number of bits required to write any of values in deltas_ vector. // See overflow comment above. const auto bit_width = bit_width_data[i] = bit_util::NumRequiredBits(max_delta - min_delta); for (uint32_t j = start; j < start + values_current_mini_block; j++) { // See overflow comment above. const UT value = deltas_[j] - min_delta; bit_writer_.PutValue(value, bit_width); } // If there are not enough values to fill the last mini block, we pad the mini block // with zeroes so that its length is the number of values in a full mini block // multiplied by the bit width. for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) { bit_writer_.PutValue(0, bit_width); } values_current_block_ -= values_current_mini_block; } // If, in the last block, less than <number of miniblocks in a block> miniblocks are // needed to store the values, the bytes storing the bit widths of the unneeded // miniblocks are still present, their value should be zero, but readers must accept // arbitrary values as well. for (uint32_t i = num_miniblocks; i < mini_blocks_per_block_; i++) { bit_width_data[i] = 0; } DCHECK_EQ(values_current_block_, 0); bit_writer_.Flush(); PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); } template <typename DType> std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() { if (values_current_block_ > 0) { FlushBlock(); } PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); if (!header_writer.PutVlqInt(values_per_block_) || !header_writer.PutVlqInt(mini_blocks_per_block_) || !header_writer.PutVlqInt(total_value_count_) || !header_writer.PutZigZagVlqInt(static_cast<T>(first_value_))) { throw ParquetException("header writing error"); } header_writer.Flush(); // We reserved enough space at the beginning of the buffer for largest possible header // and data was written immediately after. We now write the header data immediately // before the end of reserved space. const size_t offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written(); std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_, header_writer.bytes_written()); // Reset counter of cached values total_value_count_ = 0; // Reserve enough space at the beginning of the buffer for largest possible header. PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize)); // Excess bytes at the beginning are sliced off and ignored. return SliceBuffer(buffer, offset_bytes); } template <> void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) { const ::arrow::ArrayData& data = *values.data(); if (values.type_id() != ::arrow::Type::INT32) { throw ParquetException("Expected Int32TArray, got ", values.type()->ToString()); } if (data.length > std::numeric_limits<int32_t>::max()) { throw ParquetException("Array cannot be longer than ", std::numeric_limits<int32_t>::max()); } if (values.null_count() == 0) { Put(data.GetValues<int32_t>(1), static_cast<int>(data.length)); } else { PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), data.offset); } } template <> void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) { const ::arrow::ArrayData& data = *values.data(); if (values.type_id() != ::arrow::Type::INT64) { throw ParquetException("Expected Int64TArray, got ", values.type()->ToString()); } if (data.length > std::numeric_limits<int32_t>::max()) { throw ParquetException("Array cannot be longer than ", std::numeric_limits<int32_t>::max()); } if (values.null_count() == 0) { Put(data.GetValues<int64_t>(1), static_cast<int>(data.length)); } else { PutSpaced(data.GetValues<int64_t>(1), static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), data.offset); } } template <typename DType> void DeltaBitPackEncoder<DType>::PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = reinterpret_cast<T*>(buffer->mutable_data()); int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } // ---------------------------------------------------------------------- // DeltaBitPackDecoder template <typename DType> class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DType> { public: typedef typename DType::c_type T; using UT = std::make_unsigned_t<T>; explicit DeltaBitPackDecoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) { if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) { throw ParquetException("Delta bit pack encoding should only be for integer data."); } } void SetData(int num_values, const uint8_t* data, int len) override { // num_values is equal to page's num_values, including null values in this page this->num_values_ = num_values; decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len); InitHeader(); } // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder or // DeltaByteArrayDecoder void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) { this->num_values_ = num_values; decoder_ = std::move(decoder); InitHeader(); } int ValidValuesCount() { // total_value_count_ in header ignores of null values return static_cast<int>(total_value_count_); } int Decode(T* buffer, int max_values) override { return GetInternal(buffer, max_values); } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::Accumulator* out) override { if (null_count != 0) { ParquetException::NYI("Delta bit pack DecodeArrow with null slots"); } std::vector<T> values(num_values); int decoded_count = GetInternal(values.data(), num_values); PARQUET_THROW_NOT_OK(out->AppendValues(values.data(), decoded_count)); return decoded_count; } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::DictAccumulator* out) override { if (null_count != 0) { ParquetException::NYI("Delta bit pack DecodeArrow with null slots"); } std::vector<T> values(num_values); int decoded_count = GetInternal(values.data(), num_values); PARQUET_THROW_NOT_OK(out->Reserve(decoded_count)); for (int i = 0; i < decoded_count; ++i) { PARQUET_THROW_NOT_OK(out->Append(values[i])); } return decoded_count; } private: static constexpr int kMaxDeltaBitWidth = static_cast<int>(sizeof(T) * 8); void InitHeader() { if (!decoder_->GetVlqInt(&values_per_block_) || !decoder_->GetVlqInt(&mini_blocks_per_block_) || !decoder_->GetVlqInt(&total_value_count_) || !decoder_->GetZigZagVlqInt(&last_value_)) { ParquetException::EofException(); } if (values_per_block_ == 0) { throw ParquetException("cannot have zero value per block"); } if (values_per_block_ % 128 != 0) { throw ParquetException( "the number of values in a block must be multiple of 128, but it's " + std::to_string(values_per_block_)); } if (mini_blocks_per_block_ == 0) { throw ParquetException("cannot have zero miniblock per block"); } values_per_mini_block_ = values_per_block_ / mini_blocks_per_block_; if (values_per_mini_block_ == 0) { throw ParquetException("cannot have zero value per miniblock"); } if (values_per_mini_block_ % 32 != 0) { throw ParquetException( "the number of values in a miniblock must be multiple of 32, but it's " + std::to_string(values_per_mini_block_)); } delta_bit_widths_ = AllocateBuffer(pool_, mini_blocks_per_block_); block_initialized_ = false; values_current_mini_block_ = 0; } void InitBlock() { if (!decoder_->GetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); // read the bitwidth of each miniblock uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { if (!decoder_->GetAligned<uint8_t>(1, bit_width_data + i)) { ParquetException::EofException(); } if (bit_width_data[i] > kMaxDeltaBitWidth) { throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) + " larger than integer bit width " + std::to_string(kMaxDeltaBitWidth)); } } mini_block_idx_ = 0; delta_bit_width_ = bit_width_data[0]; values_current_mini_block_ = values_per_mini_block_; block_initialized_ = true; } int GetInternal(T* buffer, int max_values) { max_values = static_cast<int>(std::min<int64_t>(max_values, total_value_count_)); if (max_values == 0) { return 0; } int i = 0; while (i < max_values) { if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) { if (ARROW_PREDICT_FALSE(!block_initialized_)) { buffer[i++] = last_value_; DCHECK_EQ(i, 1); // we're at the beginning of the page if (ARROW_PREDICT_FALSE(i == max_values)) { // When block is uninitialized and i reaches max_values we have two // different possibilities: // 1. total_value_count_ == 1, which means that the page may have only // one value (encoded in the header), and we should not initialize // any block. // 2. total_value_count_ != 1, which means we should initialize the // incoming block for subsequent reads. if (total_value_count_ != 1) { InitBlock(); } break; } InitBlock(); } else { ++mini_block_idx_; if (mini_block_idx_ < mini_blocks_per_block_) { delta_bit_width_ = delta_bit_widths_->data()[mini_block_idx_]; values_current_mini_block_ = values_per_mini_block_; } else { InitBlock(); } } } int values_decode = std::min(values_current_mini_block_, static_cast<uint32_t>(max_values - i)); if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) != values_decode) { ParquetException::EofException(); } for (int j = 0; j < values_decode; ++j) { // Addition between min_delta, packed int and last_value should be treated as // unsigned addition. Overflow is as expected. buffer[i + j] = static_cast<UT>(min_delta_) + static_cast<UT>(buffer[i + j]) + static_cast<UT>(last_value_); last_value_ = buffer[i + j]; } values_current_mini_block_ -= values_decode; i += values_decode; } total_value_count_ -= max_values; this->num_values_ -= max_values; if (ARROW_PREDICT_FALSE(total_value_count_ == 0)) { uint32_t padding_bits = values_current_mini_block_ * delta_bit_width_; // skip the padding bits if (!decoder_->Advance(padding_bits)) { ParquetException::EofException(); } values_current_mini_block_ = 0; } return max_values; } MemoryPool* pool_; std::shared_ptr<::arrow::bit_util::BitReader> decoder_; uint32_t values_per_block_; uint32_t mini_blocks_per_block_; uint32_t values_per_mini_block_; uint32_t values_current_mini_block_; uint32_t total_value_count_; bool block_initialized_; T min_delta_; uint32_t mini_block_idx_; std::shared_ptr<ResizableBuffer> delta_bit_widths_; int delta_bit_width_; T last_value_; }; // ---------------------------------------------------------------------- // DELTA_LENGTH_BYTE_ARRAY class DeltaLengthByteArrayDecoder : public DecoderImpl, virtual public TypedDecoder<ByteArrayType> { public: explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), len_decoder_(nullptr, pool), buffered_length_(AllocateBuffer(pool, 0)), buffered_data_(AllocateBuffer(pool, 0)) {} void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; if (len == 0) return; decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len); DecodeLengths(); } void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) { num_values_ = num_values; decoder_ = decoder; DecodeLengths(); } int Decode(ByteArray* buffer, int max_values) override { // Decode up to `max_values` strings into an internal buffer // and reference them into `buffer`. max_values = std::min(max_values, num_valid_values_); if (max_values == 0) { return 0; } int32_t data_size = 0; const int32_t* length_ptr = reinterpret_cast<const int32_t*>(buffered_length_->data()) + length_idx_; for (int i = 0; i < max_values; ++i) { int32_t len = length_ptr[i]; if (ARROW_PREDICT_FALSE(len < 0)) { throw ParquetException("negative string delta length"); } buffer[i].len = len; if (AddWithOverflow(data_size, len, &data_size)) { throw ParquetException("excess expansion in DELTA_(LENGTH_)BYTE_ARRAY"); } } length_idx_ += max_values; PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); if (decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size) != data_size) { ParquetException::EofException(); } const uint8_t* data_ptr = buffered_data_->data(); for (int i = 0; i < max_values; ++i) { buffer[i].ptr = data_ptr; data_ptr += buffer[i].len; } this->num_values_ -= max_values; num_valid_values_ -= max_values; return max_values; } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out) override { ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder"); } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::DictAccumulator* out) override { ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder"); } private: // Decode all the encoded lengths. The decoder_ will be at the start of the encoded data // after that. void DecodeLengths() { len_decoder_.SetDecoder(num_values_, decoder_); // get the number of encoded lengths int num_length = len_decoder_.ValidValuesCount(); PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * sizeof(int32_t))); // call len_decoder_.Decode to decode all the lengths. // all the lengths are buffered in buffered_length_. int ret = len_decoder_.Decode( reinterpret_cast<int32_t*>(buffered_length_->mutable_data()), num_length); DCHECK_EQ(ret, num_length); length_idx_ = 0; num_valid_values_ = num_length; } std::shared_ptr<::arrow::bit_util::BitReader> decoder_; DeltaBitPackDecoder<Int32Type> len_decoder_; int num_valid_values_; uint32_t length_idx_; std::shared_ptr<ResizableBuffer> buffered_length_; std::shared_ptr<ResizableBuffer> buffered_data_; }; // ---------------------------------------------------------------------- // RLE_BOOLEAN_DECODER class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { public: explicit RleBooleanDecoder(const ColumnDescriptor* descr) : DecoderImpl(descr, Encoding::RLE) {} void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; uint32_t num_bytes = 0; if (len < 4) { throw ParquetException("Received invalid length : " + std::to_string(len) + " (corrupt data page?)"); } // Load the first 4 bytes in little-endian, which indicates the length num_bytes = ::arrow::bit_util::ToLittleEndian(::arrow::util::SafeLoadAs<uint32_t>(data)); if (num_bytes < 0 || num_bytes > static_cast<uint32_t>(len - 4)) { throw ParquetException("Received invalid number of bytes : " + std::to_string(num_bytes) + " (corrupt data page?)"); } auto decoder_data = data + 4; decoder_ = std::make_shared<::arrow::util::RleDecoder>(decoder_data, num_bytes, /*bit_width=*/1); } int Decode(bool* buffer, int max_values) override { max_values = std::min(max_values, num_values_); if (decoder_->GetBatch(buffer, max_values) != max_values) { ParquetException::EofException(); } num_values_ -= max_values; return max_values; } int Decode(uint8_t* buffer, int max_values) override { ParquetException::NYI("Decode(uint8_t*, int) for RleBooleanDecoder"); } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<BooleanType>::Accumulator* out) override { ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); } int DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<BooleanType>::DictAccumulator* builder) override { ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); } private: std::shared_ptr<::arrow::util::RleDecoder> decoder_; }; // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY class DeltaByteArrayDecoder : public DecoderImpl, virtual public TypedDecoder<ByteArrayType> { public: explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY), prefix_len_decoder_(nullptr, pool), suffix_decoder_(nullptr, pool), last_value_in_previous_page_(""), buffered_prefix_length_(AllocateBuffer(pool, 0)), buffered_data_(AllocateBuffer(pool, 0)) {} void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len); prefix_len_decoder_.SetDecoder(num_values, decoder_); // get the number of encoded prefix lengths int num_prefix = prefix_len_decoder_.ValidValuesCount(); // call prefix_len_decoder_.Decode to decode all the prefix lengths. // all the prefix lengths are buffered in buffered_prefix_length_. PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t))); int ret = prefix_len_decoder_.Decode( reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()), num_prefix); DCHECK_EQ(ret, num_prefix); prefix_len_offset_ = 0; num_valid_values_ = num_prefix; // at this time, the decoder_ will be at the start of the encoded suffix data. suffix_decoder_.SetDecoder(num_values, decoder_); // TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set // to last_value_in_previous_page_ when decoding a new page(except the first page) last_value_ = ""; } int Decode(ByteArray* buffer, int max_values) override { return GetInternal(buffer, max_values); } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out) override { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); return result; } int DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) override { ParquetException::NYI("DecodeArrow of DictAccumulator for DeltaByteArrayDecoder"); } private: int GetInternal(ByteArray* buffer, int max_values) { // Decode up to `max_values` strings into an internal buffer // and reference them into `buffer`. max_values = std::min(max_values, num_valid_values_); if (max_values == 0) { return max_values; } int suffix_read = suffix_decoder_.Decode(buffer, max_values); if (ARROW_PREDICT_FALSE(suffix_read != max_values)) { ParquetException::EofException("Read " + std::to_string(suffix_read) + ", expecting " + std::to_string(max_values) + " from suffix decoder"); } int64_t data_size = 0; const int32_t* prefix_len_ptr = reinterpret_cast<const int32_t*>(buffered_prefix_length_->data()) + prefix_len_offset_; for (int i = 0; i < max_values; ++i) { if (ARROW_PREDICT_FALSE(prefix_len_ptr[i] < 0)) { throw ParquetException("negative prefix length in DELTA_BYTE_ARRAY"); } if (ARROW_PREDICT_FALSE(AddWithOverflow(data_size, prefix_len_ptr[i], &data_size) || AddWithOverflow(data_size, buffer[i].len, &data_size))) { throw ParquetException("excess expansion in DELTA_BYTE_ARRAY"); } } PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); string_view prefix{last_value_}; uint8_t* data_ptr = buffered_data_->mutable_data(); for (int i = 0; i < max_values; ++i) { if (ARROW_PREDICT_FALSE(static_cast<size_t>(prefix_len_ptr[i]) > prefix.length())) { throw ParquetException("prefix length too large in DELTA_BYTE_ARRAY"); } memcpy(data_ptr, prefix.data(), prefix_len_ptr[i]); // buffer[i] currently points to the string suffix memcpy(data_ptr + prefix_len_ptr[i], buffer[i].ptr, buffer[i].len); buffer[i].ptr = data_ptr; buffer[i].len += prefix_len_ptr[i]; data_ptr += buffer[i].len; prefix = string_view{reinterpret_cast<const char*>(buffer[i].ptr), buffer[i].len}; } prefix_len_offset_ += max_values; this->num_values_ -= max_values; num_valid_values_ -= max_values; last_value_ = std::string{prefix}; if (num_valid_values_ == 0) { last_value_in_previous_page_ = last_value_; } return max_values; } Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out, int* out_num_values) { ArrowBinaryHelper helper(out); std::vector<ByteArray> values(num_values); const int num_valid_values = GetInternal(values.data(), num_values - null_count); DCHECK_EQ(num_values - null_count, num_valid_values); auto values_ptr = reinterpret_cast<const ByteArray*>(values.data()); int value_idx = 0; RETURN_NOT_OK(VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { const auto& val = values_ptr[value_idx]; if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { RETURN_NOT_OK(helper.PushChunk()); } RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len))); ++value_idx; return Status::OK(); }, [&]() { RETURN_NOT_OK(helper.AppendNull()); --null_count; return Status::OK(); })); DCHECK_EQ(null_count, 0); *out_num_values = num_valid_values; return Status::OK(); } std::shared_ptr<::arrow::bit_util::BitReader> decoder_; DeltaBitPackDecoder<Int32Type> prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; std::string last_value_; // string buffer for last value in previous page std::string last_value_in_previous_page_; int num_valid_values_; uint32_t prefix_len_offset_; std::shared_ptr<ResizableBuffer> buffered_prefix_length_; std::shared_ptr<ResizableBuffer> buffered_data_; }; // ---------------------------------------------------------------------- // BYTE_STREAM_SPLIT template <typename DType> class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder<DType> { public: using T = typename DType::c_type; explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr); int Decode(T* buffer, int max_values) override; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::Accumulator* builder) override; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::DictAccumulator* builder) override; void SetData(int num_values, const uint8_t* data, int len) override; T* EnsureDecodeBuffer(int64_t min_values) { const int64_t size = sizeof(T) * min_values; if (!decode_buffer_ || decode_buffer_->size() < size) { PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size)); } return reinterpret_cast<T*>(decode_buffer_->mutable_data()); } private: int num_values_in_buffer_{0}; std::shared_ptr<Buffer> decode_buffer_; static constexpr size_t kNumStreams = sizeof(T); }; template <typename DType> ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor* descr) : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {} template <typename DType> void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t* data, int len) { DecoderImpl::SetData(num_values, data, len); if (num_values * static_cast<int64_t>(sizeof(T)) > len) { throw ParquetException("Data size too small for number of values (corrupted file?)"); } num_values_in_buffer_ = num_values; } template <typename DType> int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) { const int values_to_decode = std::min(num_values_, max_values); const int num_decoded_previously = num_values_in_buffer_ - num_values_; const uint8_t* data = data_ + num_decoded_previously; ::arrow::util::internal::ByteStreamSplitDecode<T>(data, values_to_decode, num_values_in_buffer_, buffer); num_values_ -= values_to_decode; len_ -= sizeof(T) * values_to_decode; return values_to_decode; } template <typename DType> int ByteStreamSplitDecoder<DType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::Accumulator* builder) { constexpr int value_size = static_cast<int>(kNumStreams); int values_decoded = num_values - null_count; if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) { ParquetException::EofException(); } PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); const int num_decoded_previously = num_values_in_buffer_ - num_values_; const uint8_t* data = data_ + num_decoded_previously; int offset = 0; #if defined(ARROW_HAVE_SIMD_SPLIT) // Use fast decoding into intermediate buffer. This will also decode // some null values, but it's fast enough that we don't care. T* decode_out = EnsureDecodeBuffer(values_decoded); ::arrow::util::internal::ByteStreamSplitDecode<T>(data, values_decoded, num_values_in_buffer_, decode_out); // XXX If null_count is 0, we could even append in bulk or decode directly into // builder VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { builder->UnsafeAppend(decode_out[offset]); ++offset; }, [&]() { builder->UnsafeAppendNull(); }); #else VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { uint8_t gathered_byte_data[kNumStreams]; for (size_t b = 0; b < kNumStreams; ++b) { const size_t byte_index = b * num_values_in_buffer_ + offset; gathered_byte_data[b] = data[byte_index]; } builder->UnsafeAppend(::arrow::util::SafeLoadAs<T>(&gathered_byte_data[0])); ++offset; }, [&]() { builder->UnsafeAppendNull(); }); #endif num_values_ -= values_decoded; len_ -= sizeof(T) * values_decoded; return values_decoded; } template <typename DType> int ByteStreamSplitDecoder<DType>::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits<DType>::DictAccumulator* builder) { ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder"); } } // namespace // ---------------------------------------------------------------------- // Encoder and decoder factory functions std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encoding, bool use_dictionary, const ColumnDescriptor* descr, MemoryPool* pool) { if (use_dictionary) { switch (type_num) { case Type::INT32: return std::make_unique<DictEncoderImpl<Int32Type>>(descr, pool); case Type::INT64: return std::make_unique<DictEncoderImpl<Int64Type>>(descr, pool); case Type::INT96: return std::make_unique<DictEncoderImpl<Int96Type>>(descr, pool); case Type::FLOAT: return std::make_unique<DictEncoderImpl<FloatType>>(descr, pool); case Type::DOUBLE: return std::make_unique<DictEncoderImpl<DoubleType>>(descr, pool); case Type::BYTE_ARRAY: return std::make_unique<DictEncoderImpl<ByteArrayType>>(descr, pool); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique<DictEncoderImpl<FLBAType>>(descr, pool); default: DCHECK(false) << "Encoder not implemented"; break; } } else if (encoding == Encoding::PLAIN) { switch (type_num) { case Type::BOOLEAN: return std::make_unique<PlainEncoder<BooleanType>>(descr, pool); case Type::INT32: return std::make_unique<PlainEncoder<Int32Type>>(descr, pool); case Type::INT64: return std::make_unique<PlainEncoder<Int64Type>>(descr, pool); case Type::INT96: return std::make_unique<PlainEncoder<Int96Type>>(descr, pool); case Type::FLOAT: return std::make_unique<PlainEncoder<FloatType>>(descr, pool); case Type::DOUBLE: return std::make_unique<PlainEncoder<DoubleType>>(descr, pool); case Type::BYTE_ARRAY: return std::make_unique<PlainEncoder<ByteArrayType>>(descr, pool); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique<PlainEncoder<FLBAType>>(descr, pool); default: DCHECK(false) << "Encoder not implemented"; break; } } else if (encoding == Encoding::BYTE_STREAM_SPLIT) { switch (type_num) { case Type::FLOAT: return std::make_unique<ByteStreamSplitEncoder<FloatType>>(descr, pool); case Type::DOUBLE: return std::make_unique<ByteStreamSplitEncoder<DoubleType>>(descr, pool); default: throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); break; } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { case Type::INT32: return std::make_unique<DeltaBitPackEncoder<Int32Type>>(descr, pool); case Type::INT64: return std::make_unique<DeltaBitPackEncoder<Int64Type>>(descr, pool); default: throw ParquetException( "DELTA_BINARY_PACKED encoder only supports INT32 and INT64"); break; } } else { ParquetException::NYI("Selected encoding is not supported"); } DCHECK(false) << "Should not be able to reach this code"; return nullptr; } std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding, const ColumnDescriptor* descr) { if (encoding == Encoding::PLAIN) { switch (type_num) { case Type::BOOLEAN: return std::make_unique<PlainBooleanDecoder>(descr); case Type::INT32: return std::make_unique<PlainDecoder<Int32Type>>(descr); case Type::INT64: return std::make_unique<PlainDecoder<Int64Type>>(descr); case Type::INT96: return std::make_unique<PlainDecoder<Int96Type>>(descr); case Type::FLOAT: return std::make_unique<PlainDecoder<FloatType>>(descr); case Type::DOUBLE: return std::make_unique<PlainDecoder<DoubleType>>(descr); case Type::BYTE_ARRAY: return std::make_unique<PlainByteArrayDecoder>(descr); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique<PlainFLBADecoder>(descr); default: break; } } else if (encoding == Encoding::BYTE_STREAM_SPLIT) { switch (type_num) { case Type::FLOAT: return std::make_unique<ByteStreamSplitDecoder<FloatType>>(descr); case Type::DOUBLE: return std::make_unique<ByteStreamSplitDecoder<DoubleType>>(descr); default: throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); break; } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { case Type::INT32: return std::make_unique<DeltaBitPackDecoder<Int32Type>>(descr); case Type::INT64: return std::make_unique<DeltaBitPackDecoder<Int64Type>>(descr); default: throw ParquetException( "DELTA_BINARY_PACKED decoder only supports INT32 and INT64"); break; } } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { if (type_num == Type::BYTE_ARRAY) { return std::make_unique<DeltaByteArrayDecoder>(descr); } throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { if (type_num == Type::BYTE_ARRAY) { return std::make_unique<DeltaLengthByteArrayDecoder>(descr); } throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::RLE) { if (type_num == Type::BOOLEAN) { return std::make_unique<RleBooleanDecoder>(descr); } throw ParquetException("RLE encoding only supports BOOLEAN"); } else { ParquetException::NYI("Selected encoding is not supported"); } DCHECK(false) << "Should not be able to reach this code"; return nullptr; } namespace detail { std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num, const ColumnDescriptor* descr, MemoryPool* pool) { switch (type_num) { case Type::BOOLEAN: ParquetException::NYI("Dictionary encoding not implemented for boolean type"); case Type::INT32: return std::make_unique<DictDecoderImpl<Int32Type>>(descr, pool); case Type::INT64: return std::make_unique<DictDecoderImpl<Int64Type>>(descr, pool); case Type::INT96: return std::make_unique<DictDecoderImpl<Int96Type>>(descr, pool); case Type::FLOAT: return std::make_unique<DictDecoderImpl<FloatType>>(descr, pool); case Type::DOUBLE: return std::make_unique<DictDecoderImpl<DoubleType>>(descr, pool); case Type::BYTE_ARRAY: return std::make_unique<DictByteArrayDecoderImpl>(descr, pool); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique<DictDecoderImpl<FLBAType>>(descr, pool); default: break; } DCHECK(false) << "Should not be able to reach this code"; return nullptr; } } // namespace detail } // namespace parquet