be/src/exec/parquet/parquet-column-readers.cc (1,400 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "parquet-column-readers.h" #include <string> #include <gutil/strings/substitute.h> #include "exec/parquet/hdfs-parquet-scanner.h" #include "exec/parquet/parquet-bool-decoder.h" #include "exec/parquet/parquet-data-converter.h" #include "exec/parquet/parquet-level-decoder.h" #include "exec/parquet/parquet-metadata-utils.h" #include "exec/parquet/parquet-struct-column-reader.h" #include "exec/scratch-tuple-batch.h" #include "parquet-collection-column-reader.h" #include "runtime/runtime-state.h" #include "runtime/scoped-buffer.h" #include "runtime/string-value.inline.h" #include "runtime/tuple.h" #include "util/debug-util.h" #include "util/dict-encoding.h" #include "util/rle-encoding.h" #include "common/names.h" using namespace impala::io; using parquet::Encoding; namespace impala { // Definition of variable declared in header for use of the // SHOULD_TRIGGER_COL_READER_DEBUG_ACTION macro. AtomicInt32 parquet_column_reader_debug_count; /// Per column type reader. InternalType is the datatype that Impala uses internally to /// store tuple data and PARQUET_TYPE is the corresponding primitive datatype (as defined /// in the parquet spec) that is used to store column values in parquet files. /// If MATERIALIZED is true, the column values are materialized into the slot described /// by slot_desc. If MATERIALIZED is false, the column values are not materialized, but /// the position can be accessed. template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> class ScalarColumnReader : public BaseScalarColumnReader { public: ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc); virtual ~ScalarColumnReader() { } virtual bool ReadValue(MemPool* pool, Tuple* tuple) override { return ReadValue<true>(tuple); } virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override { return ReadValue<false>(tuple); } virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) override { return ReadValueBatch<true>(max_values, tuple_size, tuple_mem, num_values); } virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) override { return ReadValueBatch<false>(max_values, tuple_size, tuple_mem, num_values); } virtual DictDecoderBase* GetDictionaryDecoder() override { return HasDictionaryDecoder() ? &dict_decoder_ : nullptr; } virtual bool NeedsConversion() override { return NeedsConversionInline(); } virtual bool NeedsValidation() override { return NeedsValidationInline(); } template <bool IN_COLLECTION> inline bool ReadValue(Tuple* tuple); protected: /// Implementation of the ReadValueBatch() functions specialized for this /// column reader type. This function drives the reading of data pages and /// caching of rep/def levels. Once a data page and cached levels are available, /// it calls into a more specialized MaterializeValueBatch() for doing the actual /// value materialization using the level caches. /// Use RESTRICT so that the compiler knows that it is safe to cache member /// variables in registers or on the stack (otherwise gcc's alias analysis /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with /// -fno-strict-alias). template <bool IN_COLLECTION> bool ReadValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT; /// Helper function for ReadValueBatch() above that performs value materialization. /// It assumes a data page with remaining values is available, and that the def/rep /// level caches have been populated. Materializes values into 'tuple_mem' with a /// stride of 'tuple_size' and updates 'num_buffered_values_'. Returns the number of /// values materialized in 'num_values'. /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not /// handled in this function. /// Use RESTRICT so that the compiler knows that it is safe to cache member /// variables in registers or on the stack (otherwise gcc's alias analysis /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with /// -fno-strict-alias). template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION> bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT; /// Same as above, but dispatches to the appropriate templated implementation of /// MaterializeValueBatch() based on 'page_encoding_' and NeedsConversionInline(). template <bool IN_COLLECTION> bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT; /// Fast path for MaterializeValueBatch() that materializes values for a run of /// repeated definition levels. Read up to 'max_values' values into 'tuple_mem', /// returning the number of values materialised in 'num_values'. bool MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT; /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem'. bool ReadSlots( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT; /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem', when /// conversion is needed. bool ReadAndConvertSlots( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT; /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem', when /// conversion is not needed. bool ReadSlotsNoConversion( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT; /// Read 'num_to_read' position values into a batch of tuples starting at 'tuple_mem'. void ReadPositions( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT; void ReadItemPositions( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT; void ReadFilePositions( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT; /// Reads file position into 'file_pos' based on 'rep_level'. /// It updates 'current_row_' when 'rep_level' is 0. inline ALWAYS_INLINE void ReadFilePositionBatched(int16_t rep_level, int64_t* file_pos); virtual Status CreateDictionaryDecoder( uint8_t* values, int size, DictDecoderBase** decoder) override { DCHECK(slot_desc_->type().type != TYPE_BOOLEAN) << "Dictionary encoding is not supported for bools. Should never have gotten " << "this far."; if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) { return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(), slot_desc_->type().DebugString(), "could not decode dictionary"); } dict_decoder_init_ = true; *decoder = &dict_decoder_; return Status::OK(); } virtual bool HasDictionaryDecoder() override { return dict_decoder_init_; } virtual void ClearDictionaryDecoder() override { dict_decoder_init_ = false; } virtual Status InitDataDecoder(uint8_t* data, int size) override; virtual bool SkipEncodedValuesInPage(int64_t num_values) override; private: /// Writes the next value into the appropriate destination slot in 'tuple'. Returns /// false if execution should be aborted for some reason, e.g. parse_error_ is set, the /// query is cancelled, or the scan node limit was reached. Otherwise returns true. /// /// Force inlining - GCC does not always inline this into hot loops. template <Encoding::type ENCODING, bool NEEDS_CONVERSION> inline ALWAYS_INLINE bool ReadSlot(Tuple* tuple); /// Decode one value from *data into 'val' and advance *data. 'data_end' is one byte /// past the end of the buffer. Return false and set 'parse_error_' if there is an /// error decoding the value. template <Encoding::type ENCODING> inline ALWAYS_INLINE bool DecodeValue( uint8_t** data, const uint8_t* data_end, InternalType* RESTRICT val) RESTRICT; /// Decode multiple values into 'out_vals' with a stride of 'stride' bytes. Return /// false and set 'parse_error_' if there is an error decoding any value. inline ALWAYS_INLINE bool DecodeValues( int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT; /// Specialisation of DecodeValues for a particular encoding, to allow overriding /// specific instances via template specialisation. template <Encoding::type ENCODING> inline ALWAYS_INLINE bool DecodeValues( int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT; /// Most column readers never require conversion, so we can avoid branches by /// returning constant false. Column readers for types that require conversion /// must specialize this function. inline bool NeedsConversionInline() const; /// Similar to NeedsConversion(), most column readers do not require validation, /// so to avoid branches, we return constant false. In general, types where not /// all possible bit representations of the data type are valid should be /// validated. inline bool NeedsValidationInline() const { return false; } /// Converts and writes 'src' into 'slot' based on desc_->type() bool ConvertSlot(const InternalType* src, void* slot) { return data_converter_.ConvertSlot(src, slot); } /// Checks if 'val' is invalid, e.g. due to being out of the valid value range. If it /// is invalid, logs the error and returns false. If the error should stop execution, /// sets 'parent_->parse_status_'. bool ValidateValue(InternalType* val) const { DCHECK(false); return false; } /// Pull out slow-path Status construction code void __attribute__((noinline)) SetDictDecodeError() { parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(), slot_desc_->type().DebugString(), col_chunk_reader_.stream()->file_offset()); } void __attribute__((noinline)) SetPlainDecodeError() { parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(), slot_desc_->type().DebugString(), col_chunk_reader_.stream()->file_offset()); } void __attribute__((noinline)) SetBoolDecodeError() { parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_BOOL_VALUE, filename(), PrintValue(page_encoding_), col_chunk_reader_.stream()->file_offset()); } ParquetTimestampDecoder& GetTimestampDecoder() { return data_converter_.timestamp_decoder(); } /// Dictionary decoder for decoding column values. DictDecoder<InternalType> dict_decoder_; /// True if dict_decoder_ has been initialized with a dictionary page. bool dict_decoder_init_ = false; /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or /// the max length for VARCHAR columns. Unused otherwise. int fixed_len_size_; /// Converts values if needed. ParquetDataConverter<InternalType, MATERIALIZED> data_converter_; /// Contains extra state required to decode boolean values. Only initialised for /// BOOLEAN columns. unique_ptr<ParquetBoolDecoder> bool_decoder_; /// Allocated from parent_->perm_pool_ if NeedsConversion() is true and null otherwise. uint8_t* conversion_buffer_ = nullptr; }; template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader( HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc) : BaseScalarColumnReader(parent, node, slot_desc), dict_decoder_(parent->scan_node_->mem_tracker()), data_converter_(node.element, MATERIALIZED ? &slot_desc->type() : nullptr) { if (!MATERIALIZED) { // We're not materializing any values, just counting them. No need (or ability) to // initialize state used to materialize values. DCHECK(slot_desc_ == nullptr); return; } DCHECK(slot_desc_ != nullptr); if (slot_desc_->type().type == TYPE_DECIMAL && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) { fixed_len_size_ = node.element->type_length; } else if (slot_desc_->type().type == TYPE_VARCHAR) { fixed_len_size_ = slot_desc_->type().len; } else { fixed_len_size_ = -1; } if (slot_desc_->type().type == TYPE_TIMESTAMP) { data_converter_.SetTimestampDecoder(parent->CreateTimestampDecoder(*node.element)); dict_decoder_.SetTimestampHelper(GetTimestampDecoder()); } if (slot_desc_->type().type == TYPE_BOOLEAN) { bool_decoder_ = make_unique<ParquetBoolDecoder>(); } } template <typename T> struct IsDecimalValue { static constexpr bool value = std::is_same<T, Decimal4Value>::value || std::is_same<T, Decimal8Value>::value || std::is_same<T, Decimal16Value>::value; }; template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> inline bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED> ::NeedsConversionInline() const { //TODO: use constexpr ifs when we switch to C++17. if /* constexpr */ (MATERIALIZED) { if /* constexpr */ (IsDecimalValue<InternalType>::value) { return data_converter_.NeedsConversion(); } if /* constexpr */ (std::is_same<InternalType, TimestampValue>::value) { return data_converter_.NeedsConversion(); } if /* constexpr */ (std::is_same<InternalType, StringValue>::value && PARQUET_TYPE == parquet::Type::BYTE_ARRAY) { return data_converter_.NeedsConversion(); } } DCHECK(!data_converter_.NeedsConversion()); return false; } // TODO: consider performing filter selectivity checks in this function. template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataDecoder( uint8_t* data, int size) { // Data can be empty if the column contains all NULLs DCHECK_GE(size, 0); DCHECK(slot_desc_ == nullptr || slot_desc_->type().type != TYPE_BOOLEAN) << "Bool has specialized impl"; if (!IsDictionaryEncoding(page_encoding_) && page_encoding_ != parquet::Encoding::PLAIN) { return GetUnsupportedDecodingError(); } // PLAIN_DICTIONARY is deprecated in Parquet V2. It means the same as RLE_DICTIONARY // so internally PLAIN_DICTIONARY can be used to represent both encodings. if (page_encoding_ == parquet::Encoding::RLE_DICTIONARY) { page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; } // If slot_desc_ is NULL, we don't need to decode any values so dict_decoder_ does // not need to be initialized. if (IsDictionaryEncoding(page_encoding_) && slot_desc_ != nullptr) { if (!dict_decoder_init_) { return Status("File corrupt. Missing dictionary page."); } RETURN_IF_ERROR(dict_decoder_.SetData(data, size)); } // Allocate a temporary buffer to hold InternalType values if we need to convert // before writing to the final slot. if (NeedsConversionInline() && conversion_buffer_ == nullptr) { int64_t buffer_size = sizeof(InternalType) * parent_->state_->batch_size(); conversion_buffer_ = parent_->perm_pool_->TryAllocateAligned(buffer_size, alignof(InternalType)); if (conversion_buffer_ == nullptr) { return parent_->perm_pool_->mem_tracker()->MemLimitExceeded(parent_->state_, "Failed to allocate conversion buffer in Parquet scanner", buffer_size); } } return Status::OK(); } template <> Status ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::InitDataDecoder( uint8_t* data, int size) { // Data can be empty if the column contains all NULLs DCHECK_GE(size, 0); /// Boolean decoding is delegated to 'bool_decoder_'. if (bool_decoder_->SetData(page_encoding_, data, size)) return Status::OK(); return GetUnsupportedDecodingError(); } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::SkipEncodedValuesInPage(int64_t num_values) { if (bool_decoder_) { return bool_decoder_->SkipValues(num_values); } if (IsDictionaryEncoding(page_encoding_)) { return dict_decoder_.SkipValues(num_values); } else { DCHECK_EQ(page_encoding_, Encoding::PLAIN); int64_t encoded_len = ParquetPlainEncoder::EncodedLen<PARQUET_TYPE>( data_, data_end_, fixed_len_size_, num_values); if (encoded_len < 0) return false; data_ += encoded_len; } return true; } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> template <bool IN_COLLECTION> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValue( Tuple* tuple) { // NextLevels() should have already been called and def and rep levels should be in // valid range. DCHECK_GE(rep_level_, 0); DCHECK_GE(def_level_, 0); DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << "Caller should have called NextLevels() until we are ready to read a value"; if (MATERIALIZED) { if (def_level_ >= max_def_level()) { bool continue_execution; if (IsDictionaryEncoding(page_encoding_)) { continue_execution = NeedsConversionInline() ? ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) : ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple); } else { DCHECK_EQ(page_encoding_, Encoding::PLAIN); continue_execution = NeedsConversionInline() ? ReadSlot<Encoding::PLAIN, true>(tuple) : ReadSlot<Encoding::PLAIN, false>(tuple); } if (!continue_execution) return false; } else { SetNullSlot(tuple); } } return NextLevels<IN_COLLECTION>(); } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> template <bool IN_COLLECTION> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatch( int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT { // Repetition level is only present if this column is nested in a collection type. if (IN_COLLECTION) { DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString(); } else { DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); } int val_count = 0; bool continue_execution = true; while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { DCHECK_GE(num_buffered_values_, 0); // Read next page if necessary. It will skip values if necessary, so we can start // materializing the values right after. if (num_buffered_values_ == 0) { if (!NextPage()) { continue_execution = parent_->parse_status_.ok(); continue; } } // Not materializing anything - skip decoding any levels and rely on the value // count from page metadata to return the correct number of rows. if (!MATERIALIZED && !IN_COLLECTION && file_pos_slot_desc() == nullptr) { // We cannot filter pages in this context. DCHECK(!DoesPageFiltering()); int vals_to_add = min(num_buffered_values_, max_values - val_count); val_count += vals_to_add; num_buffered_values_ -= vals_to_add; DCHECK_GE(num_buffered_values_, 0); continue; } // Fill the rep level cache if needed. We are flattening out the fields of the // nested collection into the top-level tuple returned by the scan, so we don't // care about the nesting structure unless the position slot is being populated, // or we filter out rows. if (IN_COLLECTION && (AnyPosSlotToBeFilled() || DoesPageFiltering()) && !rep_levels_.CacheHasNext()) { parent_->parse_status_.MergeStatus( rep_levels_.CacheNextBatch(num_buffered_values_)); if (UNLIKELY(!parent_->parse_status_.ok())) return false; } const int remaining_val_capacity = max_values - val_count; uint8_t* next_tuple = tuple_mem + val_count * tuple_size; if (def_levels_.NextRepeatedRunLength() > 0) { // Fast path to materialize a run of values with the same definition level. This // avoids checking for NULL/not-NULL for every value. int ret_val_count = 0; continue_execution = MaterializeValueBatchRepeatedDefLevel( remaining_val_capacity, tuple_size, next_tuple, &ret_val_count); val_count += ret_val_count; } else { // We don't have a repeated run - cache def levels and process value-by-value. if (!def_levels_.CacheHasNext()) { parent_->parse_status_.MergeStatus( def_levels_.CacheNextBatch(num_buffered_values_)); if (UNLIKELY(!parent_->parse_status_.ok())) return false; } // Read data page and cached levels to materialize values. int ret_val_count = 0; continue_execution = MaterializeValueBatch<IN_COLLECTION>( remaining_val_capacity, tuple_size, next_tuple, &ret_val_count); val_count += ret_val_count; } // Now that we have read some values, let's check whether we should skip some // due to page filtering. if (DoesPageFiltering() && ConsumedCurrentCandidateRange<IN_COLLECTION>()) { if (IsLastCandidateRange()) { *num_values = val_count; num_buffered_values_ = 0; return val_count > 0; } AdvanceCandidateRange(); if (PageHasRemainingCandidateRows()) { if(!SkipRowsInPage()) return false; } else { if (!JumpToNextPage()) return false; } } if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) { continue_execution &= ColReaderDebugAction(&val_count); } } *num_values = val_count; return continue_execution; } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch( int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT { DCHECK(MATERIALIZED || IN_COLLECTION || file_pos_slot_desc() != nullptr); DCHECK_GT(num_buffered_values_, 0); DCHECK(def_levels_.CacheHasNext()); if (IN_COLLECTION && (AnyPosSlotToBeFilled() || DoesPageFiltering())) { DCHECK(rep_levels_.CacheHasNext()); } int cache_start_idx = def_levels_.CacheCurrIdx(); uint8_t* curr_tuple = tuple_mem; int val_count = 0; DCHECK_LE(def_levels_.CacheRemaining(), num_buffered_values_); max_values = min(max_values, num_buffered_values_); while (def_levels_.CacheHasNext() && val_count < max_values) { if (DoesPageFiltering()) { int peek_rep_level = IN_COLLECTION ? rep_levels_.PeekLevel() : 0; if (RowsRemainingInCandidateRange() == 0 && peek_rep_level == 0) break; } int rep_level = IN_COLLECTION ? rep_levels_.ReadLevel() : 0; if (rep_level == 0) ++current_row_; Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple); int def_level = def_levels_.CacheGetNext(); if (IN_COLLECTION) { if (def_level < def_level_of_immediate_repeated_ancestor()) { // A containing repeated field is empty or NULL, skip the value. continue; } if (pos_slot_desc()) { ReadItemPositionBatched(rep_level, tuple->GetBigIntSlot(pos_slot_desc_->tuple_offset())); } } else if (file_pos_slot_desc()) { ReadFilePositionBatched(rep_level, tuple->GetBigIntSlot(file_pos_slot_desc_->tuple_offset())); } if (MATERIALIZED) { if (def_level >= max_def_level()) { bool continue_execution = ReadSlot<ENCODING, NEEDS_CONVERSION>(tuple); if (UNLIKELY(!continue_execution)) return false; } else { SetNullSlot(tuple); } } curr_tuple += tuple_size; ++val_count; } num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx); DCHECK_GE(num_buffered_values_, 0); *num_values = val_count; return true; } // Note that the structure of this function is very similar to MaterializeValueBatch() // above, except it is unrolled to operate on multiple values at a time. template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT { DCHECK_GT(num_buffered_values_, 0); if (max_rep_level_ > 0 && (AnyPosSlotToBeFilled() || DoesPageFiltering())) { DCHECK(rep_levels_.CacheHasNext()); } int32_t def_level_repeats = def_levels_.NextRepeatedRunLength(); DCHECK_GT(def_level_repeats, 0); // Peek at the def level. The number of def levels we'll consume depends on several // conditions below. uint8_t def_level = def_levels_.GetRepeatedValue(0); int32_t num_def_levels_to_consume = 0; // Find the upper limit of how many def levels we can consume. if (def_level < def_level_of_immediate_repeated_ancestor()) { DCHECK_GT(max_rep_level_, 0) << "Only possible if in a collection."; // A containing repeated field is empty or NULL. We don't need to return any values // but need to advance any rep levels. if (AnyPosSlotToBeFilled()) { num_def_levels_to_consume = min<uint32_t>(def_level_repeats, rep_levels_.CacheRemaining()); } else { num_def_levels_to_consume = def_level_repeats; } } else { // Cannot consume more levels than allowed by buffered input values and output space. num_def_levels_to_consume = min(min( num_buffered_values_, max_values), def_level_repeats); if (max_rep_level_ > 0 && AnyPosSlotToBeFilled()) { num_def_levels_to_consume = min<uint32_t>(num_def_levels_to_consume, rep_levels_.CacheRemaining()); } } // Page filtering can also put an upper limit on 'num_def_levels_to_consume'. if (DoesPageFiltering()) { int rows_remaining = RowsRemainingInCandidateRange(); if (max_rep_level_ == 0) { num_def_levels_to_consume = min(num_def_levels_to_consume, rows_remaining); if (file_pos_slot_desc()) { ReadFilePositions(num_def_levels_to_consume, tuple_size, tuple_mem); } else { current_row_ += num_def_levels_to_consume; } } else { // We need to calculate how many 'primitive' values are there until the end // of the current candidate range. In the meantime we also fill the position // slots because we are consuming the repetition levels. num_def_levels_to_consume = FillPositionsInCandidateRange(rows_remaining, num_def_levels_to_consume, tuple_mem, tuple_size); } } // Now we have 'num_def_levels_to_consume' set, let's read the slots. if (def_level < def_level_of_immediate_repeated_ancestor()) { if (AnyPosSlotToBeFilled() && !DoesPageFiltering()) { rep_levels_.CacheSkipLevels(num_def_levels_to_consume); } *num_values = 0; } else { if (AnyPosSlotToBeFilled() && !DoesPageFiltering()) { ReadPositions(num_def_levels_to_consume, tuple_size, tuple_mem); } if (MATERIALIZED) { if (def_level >= max_def_level()) { if (!ReadSlots(num_def_levels_to_consume, tuple_size, tuple_mem)) { return false; } } else { Tuple::SetNullIndicators( null_indicator_offset_, num_def_levels_to_consume, tuple_size, tuple_mem); } } *num_values = num_def_levels_to_consume; } // We now know how many we actually consumed. def_levels_.GetRepeatedValue(num_def_levels_to_consume); num_buffered_values_ -= num_def_levels_to_consume; DCHECK_GE(num_buffered_values_, 0); return true; } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> template <bool IN_COLLECTION> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch( int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT { // Dispatch to the correct templated implementation of MaterializeValueBatch(). if (IsDictionaryEncoding(page_encoding_)) { if (NeedsConversionInline()) { return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, true>( max_values, tuple_size, tuple_mem, num_values); } else { return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, false>( max_values, tuple_size, tuple_mem, num_values); } } else { DCHECK_EQ(page_encoding_, Encoding::PLAIN); if (NeedsConversionInline()) { return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, true>( max_values, tuple_size, tuple_mem, num_values); } else { return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, false>( max_values, tuple_size, tuple_mem, num_values); } } } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> template <Encoding::type ENCODING, bool NEEDS_CONVERSION> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot( Tuple* RESTRICT tuple) RESTRICT { void* slot = tuple->GetSlot(tuple_offset_); // Use an uninitialized stack allocation for temporary value to avoid running // constructors doing work unnecessarily, e.g. if T == StringValue. alignas(InternalType) uint8_t val_buf[sizeof(InternalType)]; InternalType* val_ptr = reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot); if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return false; if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr)) || (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot)))) { if (UNLIKELY(!parent_->parse_status_.ok())) return false; // The value is invalid but execution should continue - set the null indicator and // skip conversion. SetNullSlot(tuple); return true; } return true; } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlots( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT { if (NeedsConversionInline()) { return ReadAndConvertSlots(num_to_read, tuple_size, tuple_mem); } else { return ReadSlotsNoConversion(num_to_read, tuple_size, tuple_mem); } } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadAndConvertSlots( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT { DCHECK(NeedsConversionInline()); DCHECK(conversion_buffer_ != nullptr); InternalType* first_val = reinterpret_cast<InternalType*>(conversion_buffer_); // Decode into the conversion buffer before doing the conversion into the output tuples. if (!DecodeValues(sizeof(InternalType), num_to_read, first_val)) return false; InternalType* curr_val = first_val; uint8_t* curr_tuple = tuple_mem; for (int64_t i = 0; i < num_to_read; ++i, ++curr_val, curr_tuple += tuple_size) { Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple); if ((NeedsValidationInline() && UNLIKELY(!ValidateValue(curr_val))) || UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) { if (UNLIKELY(!parent_->parse_status_.ok())) return false; // The value or the conversion is invalid but execution should continue - set the // null indicator. SetNullSlot(tuple); continue; } } return true; } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlotsNoConversion( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT { DCHECK(!NeedsConversionInline()); // No conversion needed - decode directly into the output slots. InternalType* first_slot = reinterpret_cast<InternalType*>(tuple_mem + tuple_offset_); if (!DecodeValues(tuple_size, num_to_read, first_slot)) return false; if (NeedsValidationInline()) { // Validate the written slots. uint8_t* curr_tuple = tuple_mem; for (int64_t i = 0; i < num_to_read; ++i, curr_tuple += tuple_size) { Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple); InternalType* val = static_cast<InternalType*>(tuple->GetSlot(tuple_offset_)); if (UNLIKELY(!ValidateValue(val))) { if (UNLIKELY(!parent_->parse_status_.ok())) return false; // The value is invalid but execution should continue - set the null indicator and // skip conversion. SetNullSlot(tuple); } } } return true; } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> template <Encoding::type ENCODING> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue( uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end, InternalType* RESTRICT val) RESTRICT { DCHECK_EQ(page_encoding_, ENCODING); if (IsDictionaryEncoding(ENCODING)) { if (UNLIKELY(!dict_decoder_.GetNextValue(val))) { SetDictDecodeError(); return false; } } else { DCHECK_EQ(ENCODING, Encoding::PLAIN); int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>( *data, data_end, fixed_len_size_, val); if (UNLIKELY(encoded_len < 0)) { SetPlainDecodeError(); return false; } *data += encoded_len; } return true; } // Specialise for decoding INT64 timestamps from PLAIN decoding, which need to call // out to the timestamp decoder. template <> template <> bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::DecodeValue<Encoding::PLAIN>(uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end, TimestampValue* RESTRICT val) RESTRICT { DCHECK_EQ(page_encoding_, Encoding::PLAIN); int encoded_len = GetTimestampDecoder().Decode<parquet::Type::INT64>(*data, data_end, val); if (UNLIKELY(encoded_len < 0)) { SetPlainDecodeError(); return false; } *data += encoded_len; return true; } template <> template <Encoding::type ENCODING> bool ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::DecodeValue( uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end, bool* RESTRICT value) RESTRICT { if (UNLIKELY(!bool_decoder_->DecodeValue<ENCODING>(value))) { SetBoolDecodeError(); return false; } return true; } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues( int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT { if (IsDictionaryEncoding(page_encoding_)) { return DecodeValues<Encoding::PLAIN_DICTIONARY>(stride, count, out_vals); } else { DCHECK_EQ(page_encoding_, Encoding::PLAIN); return DecodeValues<Encoding::PLAIN>(stride, count, out_vals); } } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> template <Encoding::type ENCODING> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues( int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT { if (IsDictionaryEncoding(page_encoding_)) { if (UNLIKELY(!dict_decoder_.GetNextValues(out_vals, stride, count))) { SetDictDecodeError(); return false; } } else { DCHECK_EQ(page_encoding_, Encoding::PLAIN); int64_t encoded_len = ParquetPlainEncoder::DecodeBatch<InternalType, PARQUET_TYPE>( data_, data_end_, fixed_len_size_, count, stride, out_vals); if (UNLIKELY(encoded_len < 0)) { SetPlainDecodeError(); return false; } data_ += encoded_len; } return true; } // Specialise for decoding INT64 timestamps from PLAIN decoding, which need to call // out to the timestamp decoder. template <> template <> bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::DecodeValues<Encoding::PLAIN>(int64_t stride, int64_t count, TimestampValue* RESTRICT out_vals) RESTRICT { DCHECK_EQ(page_encoding_, Encoding::PLAIN); int64_t encoded_len = GetTimestampDecoder().DecodeBatch<parquet::Type::INT64>( data_, data_end_, count, stride, out_vals); if (UNLIKELY(encoded_len < 0)) { SetPlainDecodeError(); return false; } data_ += encoded_len; return true; } template <> bool ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::DecodeValues( int64_t stride, int64_t count, bool* RESTRICT out_vals) RESTRICT { if (!bool_decoder_->DecodeValues(stride, count, out_vals)) { SetBoolDecodeError(); return false; } return true; } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>:: ReadFilePositionBatched(int16_t rep_level, int64_t* file_pos) { *file_pos = FilePositionOfCurrentRow(); } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositions( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT { DCHECK(file_pos_slot_desc() != nullptr || pos_slot_desc() != nullptr); DCHECK(file_pos_slot_desc() == nullptr || pos_slot_desc() == nullptr); if (file_pos_slot_desc()) { ReadFilePositions(num_to_read, tuple_size, tuple_mem); } else { ReadItemPositions(num_to_read, tuple_size, tuple_mem); } } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadFilePositions( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT { DCHECK(file_pos_slot_desc() != nullptr); DCHECK(pos_slot_desc() == nullptr); int64_t* file_pos_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetBigIntSlot( file_pos_slot_desc()->tuple_offset()); StrideWriter<int64_t> file_out{reinterpret_cast<int64_t*>(file_pos_slot), tuple_size}; for (int64_t i = 0; i < num_to_read; ++i) { int64_t* file_pos = file_out.Advance(); uint8_t rep_level = max_rep_level() > 0 ? rep_levels_.CacheGetNext() : 0; if (rep_level == 0) ++current_row_; ReadFilePositionBatched(rep_level, file_pos); } } template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadItemPositions( int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT { DCHECK(file_pos_slot_desc() == nullptr); DCHECK(pos_slot_desc() != nullptr); int64_t* pos_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetBigIntSlot( pos_slot_desc()->tuple_offset()); StrideWriter<int64_t> out{reinterpret_cast<int64_t*>(pos_slot), tuple_size}; for (int64_t i = 0; i < num_to_read; ++i) { int64_t* pos = out.Advance(); uint8_t rep_level = max_rep_level() > 0 ? rep_levels_.CacheGetNext() : 0; if (rep_level == 0) ++current_row_; ReadItemPositionBatched(rep_level, pos); } } template <> inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true> ::NeedsValidationInline() const { return true; } template <> inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true> ::NeedsValidationInline() const { return true; } template <> bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateValue( TimestampValue* val) const { if (UNLIKELY(!TimestampValue::IsValidDate(val->date()) || !TimestampValue::IsValidTime(val->time()))) { // If both are corrupt, invalid time takes precedence over invalid date, because // invalid date may come from a more or less functional encoder that does not respect // the 1400..9999 limit, while an invalid time is a good indicator of buggy encoder // or memory garbage. TErrorCode::type errorCode = TimestampValue::IsValidTime(val->time()) ? TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE : TErrorCode::PARQUET_TIMESTAMP_INVALID_TIME_OF_DAY; ErrorMsg msg(errorCode, filename(), node_.element->name); Status status = parent_->state_->LogOrReturnError(msg); if (!status.ok()) parent_->parse_status_ = status; return false; } return true; } template <> bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ValidateValue( TimestampValue* val) const { // The range was already checked during the int64_t->TimestampValue conversion, which // sets the date to invalid if it was out of range. if (UNLIKELY(!val->HasDate())) { ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE, filename(), node_.element->name); Status status = parent_->state_->LogOrReturnError(msg); if (!status.ok()) parent_->parse_status_ = status; return false; } DCHECK(TimestampValue::IsValidDate(val->date())); DCHECK(TimestampValue::IsValidTime(val->time())); return true; } template <> inline bool ScalarColumnReader<DateValue, parquet::Type::INT32, true> ::NeedsValidationInline() const { return true; } template <> bool ScalarColumnReader<DateValue, parquet::Type::INT32, true>::ValidateValue( DateValue* val) const { // The range was already checked during the int32_t->DateValue conversion, which // sets the date to invalid if it was out of range. if (UNLIKELY(!val->IsValid())) { ErrorMsg msg(TErrorCode::PARQUET_DATE_OUT_OF_RANGE, filename(), node_.element->name); Status status = parent_->state_->LogOrReturnError(msg); if (!status.ok()) parent_->parse_status_ = status; return false; } return true; } void BaseScalarColumnReader::CreateSubRanges(vector<ScanRange::SubRange>* sub_ranges) { sub_ranges->clear(); if (!DoesPageFiltering()) return; int64_t data_start = metadata_->data_page_offset; int64_t data_start_based_on_offset_index = offset_index_.page_locations[0].offset; if (metadata_->__isset.dictionary_page_offset) { int64_t dict_start = metadata_->dictionary_page_offset; // This assumes that the first data page is coming right after the dictionary page sub_ranges->push_back( { dict_start, data_start - dict_start }); } else if (data_start < data_start_based_on_offset_index) { // 'dictionary_page_offset' is not set, but the offset index and // column chunk metadata disagree on the data start => column chunk's data start // is actually the location of the dictionary page. Parquet-MR (at least // version 1.10 and earlier versions) writes Parquet files like that. int64_t dict_start = data_start; sub_ranges->push_back({dict_start, data_start_based_on_offset_index - dict_start}); } for (int candidate_page_idx : candidate_data_pages_) { auto page_loc = offset_index_.page_locations[candidate_page_idx]; sub_ranges->push_back( { page_loc.offset, page_loc.compressed_page_size }); } } Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk, int row_group_idx, int64_t row_group_first_row) { // Ensure metadata is valid before using it to initialize the reader. RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_, parent_->filename(), row_group_idx, col_idx(), schema_element(), parent_->state_)); num_buffered_values_ = 0; data_ = nullptr; data_end_ = nullptr; metadata_ = &col_chunk.meta_data; num_values_read_ = 0; def_level_ = ParquetLevel::INVALID_LEVEL; // See ColumnReader constructor. rep_level_ = max_rep_level() == 0 ? 0 : ParquetLevel::INVALID_LEVEL; pos_current_value_ = ParquetLevel::INVALID_POS; row_group_first_row_ = row_group_first_row; current_row_ = -1; vector<ScanRange::SubRange> sub_ranges; CreateSubRanges(&sub_ranges); RETURN_IF_ERROR(col_chunk_reader_.InitColumnChunk( file_desc, col_chunk, row_group_idx, move(sub_ranges))); ClearDictionaryDecoder(); return Status::OK(); } void BaseScalarColumnReader::Close(RowBatch* row_batch) { col_chunk_reader_.Close(row_batch == nullptr ? nullptr : row_batch->tuple_data_pool()); DictDecoderBase* dict_decoder = GetDictionaryDecoder(); if (dict_decoder != nullptr) dict_decoder->Close(); } Status BaseScalarColumnReader::InitDictionary() { // Dictionary encoding is not supported for booleans. const bool is_boolean = node_.element->type == parquet::Type::BOOLEAN; const bool skip_data = slot_desc_ == nullptr || is_boolean; // TODO: maybe avoid malloc on every page? ScopedBuffer uncompressed_buffer(parent_->dictionary_pool_->mem_tracker()); bool eos; bool is_dictionary_page; int64_t data_size; int num_entries; RETURN_IF_ERROR(col_chunk_reader_.TryReadDictionaryPage(&is_dictionary_page, &eos, skip_data, &uncompressed_buffer, &data_, &data_size, &num_entries)); if (eos) return HandleTooEarlyEos(); if (is_dictionary_page && is_boolean) { return Status("Unexpected dictionary page. Dictionary page is not" " supported for booleans."); } if (!is_dictionary_page || skip_data) return Status::OK(); // The size of dictionary can be 0, if every value is null. The dictionary still has to // be reset in this case. DictDecoderBase* dict_decoder; if (data_size == 0) { data_end_ = data_; return CreateDictionaryDecoder(nullptr, 0, &dict_decoder); } // We cannot add data_size to data_ until we know it is not a nullptr. if (data_ == nullptr) { return Status("The dictionary values could not be read properly."); } data_end_ = data_ + data_size; RETURN_IF_ERROR(CreateDictionaryDecoder(data_, data_size, &dict_decoder)); if (num_entries != dict_decoder->num_entries()) { return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(), slot_desc_->type().DebugString(), Substitute("Expected $0 entries but data contained $1 entries", num_entries, dict_decoder->num_entries())); } return Status::OK(); } Status BaseScalarColumnReader::InitDictionaries( const vector<BaseScalarColumnReader*> readers) { for (BaseScalarColumnReader* reader : readers) { RETURN_IF_ERROR(reader->InitDictionary()); } return Status::OK(); } Status BaseScalarColumnReader::ReadDataPage() { // We're about to move to the next data page. The previous data page is // now complete, free up any memory allocated for it. If the data page contained // strings we need to attach it to the returned batch. col_chunk_reader_.ReleaseResourcesOfLastPage(parent_->scratch_batch_->aux_mem_pool); DCHECK_EQ(num_buffered_values_, 0); if ((DoesPageFiltering() && candidate_page_idx_ == candidate_data_pages_.size() - 1) || num_values_read_ == metadata_->num_values) { // No more pages to read // TODO: should we check for stream_->eosr()? return Status::OK(); } else if (num_values_read_ > metadata_->num_values) { RETURN_IF_ERROR(LogCorruptNumValuesInMetadataError()); return Status::OK(); } // Read the next header, return if not found. RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPageHeader(&num_buffered_values_)); if (num_buffered_values_ == 0) return HandleTooEarlyEos(); DCHECK_GT(num_buffered_values_, 0); // Read the data in the data page. ParquetColumnChunkReader::DataPageInfo page; RETURN_IF_ERROR(col_chunk_reader_.ReadDataPageData(&page)); DCHECK(page.is_valid); num_values_read_ += num_buffered_values_; RETURN_IF_ERROR(InitDataPageDecoders(page)); // Skip rows if needed. RETURN_IF_ERROR(StartPageFiltering()); if (parent_->candidate_ranges_.empty()) COUNTER_ADD(parent_->num_pages_counter_, 1); return Status::OK(); } Status BaseScalarColumnReader::ReadNextDataPageHeader() { // We're about to move to the next data page. The previous data page is // now complete, free up any memory allocated for it. If the data page contained // strings we need to attach it to the returned batch. col_chunk_reader_.ReleaseResourcesOfLastPage(parent_->scratch_batch_->aux_mem_pool); DCHECK_EQ(num_buffered_values_, 0); if ((DoesPageFiltering() && candidate_page_idx_ == candidate_data_pages_.size() - 1) || num_values_read_ == metadata_->num_values) { // No more pages to read // TODO: should we check for stream_->eosr()? return Status::OK(); } else if (num_values_read_ > metadata_->num_values) { RETURN_IF_ERROR(LogCorruptNumValuesInMetadataError()); return Status::OK(); } RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPageHeader(&num_buffered_values_)); if (num_buffered_values_ == 0) return HandleTooEarlyEos(); DCHECK_GT(num_buffered_values_, 0); num_values_read_ += num_buffered_values_; if (parent_->candidate_ranges_.empty()) COUNTER_ADD(parent_->num_pages_counter_, 1); return Status::OK(); } Status BaseScalarColumnReader::ReadCurrentDataPage() { ParquetColumnChunkReader::DataPageInfo page; RETURN_IF_ERROR(col_chunk_reader_.ReadDataPageData(&page)); DCHECK(page.is_valid); RETURN_IF_ERROR(InitDataPageDecoders(page)); // Skip rows if needed. RETURN_IF_ERROR(StartPageFiltering()); return Status::OK(); } Status BaseScalarColumnReader::InitDataPageDecoders( const ParquetColumnChunkReader::DataPageInfo& page) { // Initialize the repetition level data DCHECK(page.rep_level_encoding == Encoding::RLE || page.rep_level_size == 0 ); RETURN_IF_ERROR(rep_levels_.Init(filename(), parent_->perm_pool_.get(), parent_->state_->batch_size(), max_rep_level(), page.rep_level_ptr, page.rep_level_size)); // Initialize the definition level data DCHECK(page.def_level_encoding == Encoding::RLE || page.def_level_size == 0 ); RETURN_IF_ERROR(def_levels_.Init(filename(), parent_->perm_pool_.get(), parent_->state_->batch_size(), max_def_level(), page.def_level_ptr, page.def_level_size)); page_encoding_ = page.data_encoding; data_ = page.data_ptr; data_end_ = data_ + page.data_size; // Data can be empty if the column contains all NULLs RETURN_IF_ERROR(InitDataDecoder(page.data_ptr, page.data_size)); return Status::OK(); } template <bool ADVANCE_REP_LEVEL> bool BaseScalarColumnReader::NextLevels() { if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); levels_readahead_ = true; if (UNLIKELY(num_buffered_values_ == 0)) { if (!NextPage()) return parent_->parse_status_.ok(); } if (DoesPageFiltering() && RowsRemainingInCandidateRange() == 0) { if (!ADVANCE_REP_LEVEL || max_rep_level() == 0 || rep_levels_.PeekLevel() == 0) { if (!IsLastCandidateRange()) AdvanceCandidateRange(); if (PageHasRemainingCandidateRows()) { auto current_range = parent_->candidate_ranges_[current_row_range_]; int64_t skip_rows = current_range.first - current_row_ - 1; DCHECK_GE(skip_rows, 0); int64_t remaining = 0; if (!SkipTopLevelRows(skip_rows, &remaining)) return false; DCHECK_EQ(remaining, 0); } else { if (!JumpToNextPage()) return parent_->parse_status_.ok(); } } } --num_buffered_values_; DCHECK_GE(num_buffered_values_, 0); // Definition level is not present if column and any containing structs are required. def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel(); // The compiler can optimize these two conditions into a single branch by treating // def_level_ as unsigned. if (UNLIKELY(def_level_ < 0 || def_level_ > max_def_level())) { SetLevelDecodeError("def", def_level_, max_def_level()); return false; } if (ADVANCE_REP_LEVEL && max_rep_level() > 0) { // Repetition level is only present if this column is nested in any collection type. rep_level_ = rep_levels_.ReadLevel(); if (UNLIKELY(rep_level_ < 0 || rep_level_ > max_rep_level())) { SetLevelDecodeError("rep", rep_level_, max_rep_level()); return false; } // Reset position counter if we are at the start of a new parent collection. if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0; if (rep_level_ == 0) ++current_row_; } else { ++current_row_; } return parent_->parse_status_.ok(); } void BaseScalarColumnReader::ResetPageFiltering() { offset_index_.page_locations.clear(); candidate_data_pages_.clear(); candidate_page_idx_ = -1; current_row_ = -1; levels_readahead_ = false; current_row_range_ = 0; } Status BaseScalarColumnReader::StartPageFiltering() { if (!DoesPageFiltering()) return Status::OK(); ++candidate_page_idx_; current_row_ = FirstRowIdxInCurrentPage() - 1; // Move to the next candidate range. auto& candidate_row_ranges = parent_->candidate_ranges_; while (current_row_ >= candidate_row_ranges[current_row_range_].last) { DCHECK_LT(current_row_range_, candidate_row_ranges.size() - 1); ++current_row_range_; } int64_t range_start = candidate_row_ranges[current_row_range_].first; if (range_start > current_row_ + 1) { int64_t skip_rows = range_start - current_row_ - 1; int64_t remaining = 0; if (!SkipTopLevelRows(skip_rows, &remaining)) { return Status(ErrorMsg(TErrorCode::PARQUET_ROWS_SKIPPING, schema_element().name, filename())); } DCHECK_EQ(remaining, 0); DCHECK_EQ(current_row_, range_start - 1); } return Status::OK(); } template <bool MULTI_PAGE> bool BaseScalarColumnReader::SkipTopLevelRows(int64_t num_rows, int64_t* remaining) { DCHECK_GT(num_rows, 0); DCHECK_GT(num_buffered_values_, 0); if (!MULTI_PAGE) { DCHECK_GE(num_buffered_values_, num_rows); } // Fastest path: field is required and not nested. // So row count equals value count, and every value is stored in the page data. if (max_def_level() == 0 && max_rep_level() == 0) { int rows_skipped; if (MULTI_PAGE) { rows_skipped = std::min((int64_t)num_buffered_values_, num_rows); } else { rows_skipped = num_rows; } current_row_ += rows_skipped; num_buffered_values_ -= rows_skipped; *remaining = num_rows - rows_skipped; return SkipEncodedValuesInPage(rows_skipped); } int64_t num_values_to_skip = 0; if (max_rep_level() == 0) { // No nesting, but field is not required. // Skip as many values in the page data as many non-NULL values encountered. int i = 0; while (i < num_rows && num_buffered_values_ > 0) { int repeated_run_length = def_levels_.NextRepeatedRunLength(); if (repeated_run_length > 0) { int read_count = min<int64_t>(num_rows - i, repeated_run_length); // IMPALA-11134: there can be a mismatch between page_header.num_values and // encoded def levels in old Parquet files. read_count = min(num_buffered_values_, read_count); int16_t def_level = def_levels_.GetRepeatedValue(read_count); if (def_level >= max_def_level_) num_values_to_skip += read_count; i += read_count; num_buffered_values_ -= read_count; } else if (def_levels_.CacheHasNext()) { int read_count = min<int64_t>(num_rows - i, def_levels_.CacheRemaining()); // IMPALA-11134: there can be a mismatch between page_header.num_values and // encoded def levels in old Parquet files. read_count = min(num_buffered_values_, read_count); for (int j = 0; j < read_count; ++j) { if (def_levels_.CacheGetNext() >= max_def_level_) ++num_values_to_skip; } i += read_count; num_buffered_values_ -= read_count; } else { if (!def_levels_.CacheNextBatch(num_buffered_values_).ok()) return false; } } DCHECK_LE(i, num_rows); current_row_ += i; *remaining = num_rows - i; } else { // 'rep_level_' being zero denotes the start of a new top-level row. // From the 'def_level_' we can determine the number of non-NULL values. while (num_buffered_values_ > 0) { if (!def_levels_.CacheNextBatchIfEmpty(num_buffered_values_).ok()) return false; if (!rep_levels_.CacheNextBatchIfEmpty(num_buffered_values_).ok()) return false; if (num_rows == 0 && rep_levels_.PeekLevel() == 0) { // No more rows to skip, and the next value belongs to a new top-level row. break; } def_level_ = def_levels_.CacheGetNext(); rep_level_ = rep_levels_.CacheGetNext(); --num_buffered_values_; DCHECK_GE(num_buffered_values_, 0); if (def_level_ >= max_def_level()) ++num_values_to_skip; if (rep_level_ == 0) { ++current_row_; --num_rows; } } *remaining = num_rows; } return SkipEncodedValuesInPage(num_values_to_skip); } bool BaseScalarColumnReader::SetRowGroupAtEnd() { if (RowGroupAtEnd()) { return true; } if (num_buffered_values_ == 0) { NextPage(); } if (DoesPageFiltering() && RowsRemainingInCandidateRange() == 0) { if (max_rep_level() == 0 || rep_levels_.PeekLevel() == 0) { if (!IsLastCandidateRange()) AdvanceCandidateRange(); if (!PageHasRemainingCandidateRows()) { JumpToNextPage(); } } } bool status = RowGroupAtEnd(); if (!status) { return false; } return parent_->parse_status_.ok(); } int BaseScalarColumnReader::FillPositionsInCandidateRange(int rows_remaining, int max_values, uint8_t* RESTRICT tuple_mem, int tuple_size) { DCHECK_GT(max_rep_level_, 0); DCHECK_EQ(rows_remaining, RowsRemainingInCandidateRange()); int row_count = 0; int val_count = 0; int64_t *pos_slot = nullptr; if (pos_slot_desc_ != nullptr) { const int pos_slot_offset = pos_slot_desc()->tuple_offset(); pos_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetBigIntSlot(pos_slot_offset); } StrideWriter<int64_t> pos_writer{pos_slot, tuple_size}; int64_t *file_pos_slot = nullptr; if (file_pos_slot_desc_ != nullptr) { const int file_pos_slot_offset = file_pos_slot_desc()->tuple_offset(); file_pos_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetBigIntSlot( file_pos_slot_offset); } StrideWriter<int64_t> file_pos_writer{file_pos_slot, tuple_size}; while (rep_levels_.CacheRemaining() && row_count <= rows_remaining && val_count < max_values) { if (row_count == rows_remaining && rep_levels_.CachePeekNext() == 0) break; int rep_level = rep_levels_.CacheGetNext(); if (rep_level == 0) { ++row_count; ++current_row_; } ++val_count; if (file_pos_writer.IsValid()) { *file_pos_writer.Advance() = FilePositionOfCurrentRow(); } else if (pos_writer.IsValid()) { if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0; *pos_writer.Advance() = pos_current_value_++; } } return val_count; } void BaseScalarColumnReader::AdvanceCandidateRange() { DCHECK(DoesPageFiltering()); auto& candidate_ranges = parent_->candidate_ranges_; DCHECK_LT(current_row_range_, candidate_ranges.size()); DCHECK_EQ(current_row_, candidate_ranges[current_row_range_].last); ++current_row_range_; DCHECK_LE(current_row_, candidate_ranges[current_row_range_].last); } bool BaseScalarColumnReader::PageHasRemainingCandidateRows() const { DCHECK(DoesPageFiltering()); DCHECK_LT(current_row_range_, parent_->candidate_ranges_.size()); auto current_range = parent_->candidate_ranges_[current_row_range_]; if (candidate_page_idx_ != candidate_data_pages_.size() - 1) { auto& next_page_loc = offset_index_.page_locations[candidate_data_pages_[candidate_page_idx_+1]]; // If the next page contains rows with index higher than the start of the // current candidate range, it means we still have interesting rows in the // current page. return next_page_loc.first_row_index > current_range.first; } if (candidate_page_idx_ == candidate_data_pages_.size() - 1) { // We are in the last page, we need to skip rows if the current top level row // precedes the next candidate range. return current_row_ < current_range.first; } return false; } bool BaseScalarColumnReader::SkipRowsInPage() { auto current_range = parent_->candidate_ranges_[current_row_range_]; DCHECK_LT(current_row_, current_range.first); int64_t skip_rows = current_range.first - current_row_ - 1; DCHECK_GE(skip_rows, 0); int64_t remaining = 0; return SkipTopLevelRows(skip_rows, &remaining); } bool BaseScalarColumnReader::JumpToNextPage() { DCHECK(DoesPageFiltering()); num_buffered_values_ = 0; return NextPage(); } Status BaseScalarColumnReader::GetUnsupportedDecodingError() { return Status(Substitute( "File '$0' is corrupt: unexpected encoding: $1 for data page of column '$2'.", filename(), PrintValue(page_encoding_), schema_element().name)); } Status BaseScalarColumnReader::LogCorruptNumValuesInMetadataError() { ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, metadata_->num_values, num_values_read_, node_.element->name, filename()); return parent_->state_->LogOrReturnError(msg); } Status BaseScalarColumnReader::HandleTooEarlyEos() { // The data pages contain fewer values than stated in the column metadata. DCHECK(col_chunk_reader_.stream()->eosr()); DCHECK_LT(num_values_read_, metadata_->num_values); return LogCorruptNumValuesInMetadataError(); } bool BaseScalarColumnReader::NextPage() { parent_->assemble_rows_timer_.Stop(); parent_->parse_status_ = ReadDataPage(); if (UNLIKELY(!parent_->parse_status_.ok())) return false; if (num_buffered_values_ == 0) { rep_level_ = ParquetLevel::ROW_GROUP_END; def_level_ = ParquetLevel::ROW_GROUP_END; pos_current_value_ = ParquetLevel::INVALID_POS; return false; } parent_->assemble_rows_timer_.Start(); return true; } bool BaseScalarColumnReader::AdvanceNextPageHeader() { num_buffered_values_ = 0; parent_->assemble_rows_timer_.Stop(); parent_->parse_status_ = ReadNextDataPageHeader(); if (UNLIKELY(!parent_->parse_status_.ok())) return false; if (num_buffered_values_ == 0) { rep_level_ = ParquetLevel::ROW_GROUP_END; def_level_ = ParquetLevel::ROW_GROUP_END; pos_current_value_ = ParquetLevel::INVALID_POS; return false; } parent_->assemble_rows_timer_.Start(); return true; } void BaseScalarColumnReader::SetLevelDecodeError( const char* level_name, int decoded_level, int max_level) { if (decoded_level < 0) { DCHECK_EQ(decoded_level, ParquetLevel::INVALID_LEVEL); parent_->parse_status_.MergeStatus( Status(Substitute("Corrupt Parquet file '$0': " "could not read all $1 levels for column '$2'", filename(), level_name, schema_element().name))); } else { parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': " "invalid $1 level $2 > max $1 level $3 for column '$4'", filename(), level_name, decoded_level, max_level, schema_element().name))); } } /// Wrapper around 'SkipTopLevelRows' to skip across multiple pages. /// Function handles 3 scenarios: /// 1. Page Filtering: When this is enabled this function can be used /// to skip to a particular 'skip_row_id'. /// 2. Collection: When this scalar reader is reading elements of a collection /// 3. Rest of the cases. /// For page filtering, we keep track of first and last page indexes and keep /// traversing to next page until we find a page that contains 'skip_row_id'. /// At that point, we can just skip to the required row id. /// If the page of 'skip_row_id' is not a candidate page, we will stop at the /// next candidate page and 'skip_row_id' is skipped by the way. /// Difference between scenario 2 and 3 is that in scenario 2, we end up /// decompressing all the pages being skipped, whereas in scenario 3 we only /// decompress pages required and avoid decompression needed. This is possible /// because in scenario 3 'data_page_header.num_values' corresponds to number /// of rows stored in the page. This is not true in scenario 2 because multiple /// consecutive values can belong to same row. template <bool IN_COLLECTION> bool BaseScalarColumnReader::SkipRowsInternal(int64_t num_rows, int64_t skip_row_id) { if (DoesPageFiltering() && skip_row_id > 0) { // Checks if its the beginning of row group and advances to next page. if (candidate_page_idx_ < 0) { if (UNLIKELY(!NextPage())) { return false; } } // Keep advancing until we hit reach required page containing 'skip_row_id' or // jump over it if that page is not a candidate page. while (skip_row_id > LastRowIdxInCurrentPage()) { COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1); if (UNLIKELY(!JumpToNextPage())) { return false; } } int64_t last_row = LastProcessedRow(); int64_t remaining = 0; if (skip_row_id >= FirstRowIdxInCurrentPage()) { // Skip to the required row id within the page. Only needs this when row id locates // in current page. if (last_row < skip_row_id) { if (UNLIKELY(!SkipTopLevelRows(skip_row_id - last_row, &remaining))) { return false; } } } // also need to adjust 'candidate_row_ranges' as we skipped to new row id. auto& candidate_row_ranges = parent_->candidate_ranges_; while (current_row_ > candidate_row_ranges[current_row_range_].last) { DCHECK_LT(current_row_range_, candidate_row_ranges.size()); ++current_row_range_; } return true; } else if (IN_COLLECTION) { DCHECK_GT(num_rows, 0); // if all the values of current page are consumed, move to next page. if (num_buffered_values_ == 0) { if (!NextPage()) { return false; } } DCHECK_GT(num_buffered_values_, 0); int64_t remaining = 0; // Try to skip 'num_rows' and see if something remains. if (!SkipTopLevelRows<true>(num_rows, &remaining)) { return false; } // Again invoke the same method on remaining rows. if (remaining > 0) { return SkipRowsInternal<IN_COLLECTION>(remaining, skip_row_id); } return true; } else { // If everything consumed in current page, skip data pages (multiple skips if needed) // to reach required page. if (num_buffered_values_ == 0) { if (!AdvanceNextPageHeader()) { return false; } DCHECK_GT(num_buffered_values_, 0); // Keep advancing to next page header if rows to be skipped are more than number // of values in the page. Note we will just be reading headers and skipping // pages without decompressing them as we advance. while (num_rows > num_buffered_values_) { COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1); num_rows -= num_buffered_values_; current_row_ += num_buffered_values_; if (!col_chunk_reader_.SkipPageData().ok() || !AdvanceNextPageHeader()) { return false; } DCHECK_GT(num_buffered_values_, 0); } // Read the data page (includes decompressing them if required). Status page_read = ReadCurrentDataPage(); if (!page_read.ok()) { return false; } } // Skip the remaining rows in the page. DCHECK_GT(num_buffered_values_, 0); int64_t remaining = 0; if (!SkipTopLevelRows<true>(num_rows, &remaining)) { return false; } if (remaining > 0) { return SkipRowsInternal<IN_COLLECTION>(remaining, skip_row_id); } return true; } }; /// Returns a column reader for decimal types based on its size and parquet type. static ParquetColumnReader* CreateDecimalColumnReader( const SchemaNode& node, const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) { switch (node.element->type) { case parquet::Type::FIXED_LEN_BYTE_ARRAY: switch (slot_desc->type().GetByteSize()) { case 4: return new ScalarColumnReader<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY, true>(parent, node, slot_desc); case 8: return new ScalarColumnReader<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY, true>(parent, node, slot_desc); case 16: return new ScalarColumnReader<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY, true>(parent, node, slot_desc); } break; case parquet::Type::BYTE_ARRAY: switch (slot_desc->type().GetByteSize()) { case 4: return new ScalarColumnReader<Decimal4Value, parquet::Type::BYTE_ARRAY, true>( parent, node, slot_desc); case 8: return new ScalarColumnReader<Decimal8Value, parquet::Type::BYTE_ARRAY, true>( parent, node, slot_desc); case 16: return new ScalarColumnReader<Decimal16Value, parquet::Type::BYTE_ARRAY, true>( parent, node, slot_desc); } break; case parquet::Type::INT32: return new ScalarColumnReader<Decimal4Value, parquet::Type::INT32, true>( parent, node, slot_desc); case parquet::Type::INT64: return new ScalarColumnReader<Decimal8Value, parquet::Type::INT64, true>( parent, node, slot_desc); default: DCHECK(false) << "Invalid decimal primitive type"; } DCHECK(false) << "Invalid decimal type"; return nullptr; } ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node, bool is_collection_field, const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) { if (is_collection_field) { // Create collection reader (note this handles both NULL and non-NULL 'slot_desc') return new CollectionColumnReader(parent, node, slot_desc); } else if (slot_desc != nullptr) { // Create the appropriate ScalarColumnReader type to read values into 'slot_desc' switch (slot_desc->type().type) { case TYPE_BOOLEAN: return new ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>( parent, node, slot_desc); case TYPE_TINYINT: return new ScalarColumnReader<int8_t, parquet::Type::INT32, true>( parent, node, slot_desc); case TYPE_SMALLINT: return new ScalarColumnReader<int16_t, parquet::Type::INT32, true>(parent, node, slot_desc); case TYPE_INT: return new ScalarColumnReader<int32_t, parquet::Type::INT32, true>(parent, node, slot_desc); case TYPE_BIGINT: switch (node.element->type) { case parquet::Type::INT32: return new ScalarColumnReader<int64_t, parquet::Type::INT32, true>(parent, node, slot_desc); default: return new ScalarColumnReader<int64_t, parquet::Type::INT64, true>(parent, node, slot_desc); } case TYPE_FLOAT: return new ScalarColumnReader<float, parquet::Type::FLOAT, true>(parent, node, slot_desc); case TYPE_DOUBLE: switch (node.element->type) { case parquet::Type::INT32: return new ScalarColumnReader<double , parquet::Type::INT32, true>(parent, node, slot_desc); case parquet::Type::FLOAT: return new ScalarColumnReader<double, parquet::Type::FLOAT, true>(parent, node, slot_desc); default: return new ScalarColumnReader<double, parquet::Type::DOUBLE, true>(parent, node, slot_desc); } case TYPE_TIMESTAMP: return CreateTimestampColumnReader(node, slot_desc, parent); case TYPE_DATE: return new ScalarColumnReader<DateValue, parquet::Type::INT32, true>(parent, node, slot_desc); case TYPE_STRING: case TYPE_VARCHAR: case TYPE_CHAR: return new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>( parent, node, slot_desc); case TYPE_DECIMAL: return CreateDecimalColumnReader(node, slot_desc, parent); case TYPE_STRUCT: return new StructColumnReader(parent, node, slot_desc); default: DCHECK(false) << slot_desc->type().DebugString(); return nullptr; } } else { // Special case for counting scalar values (e.g. count(*), no materialized columns in // the file, only materializing a position slot). We won't actually read any values, // only the rep and def levels, so it doesn't matter what kind of reader we make. return new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, node, slot_desc); } } ParquetColumnReader* ParquetColumnReader::CreateTimestampColumnReader( const SchemaNode& node, const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) { if (node.element->type == parquet::Type::INT96) { return new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>( parent, node, slot_desc); } else if (node.element->type == parquet::Type::INT64) { return new ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>( parent, node, slot_desc); } DCHECK(false) << slot_desc->type().DebugString(); return nullptr; } }