fn consume_batch()

in parquet/src/arrow/array_reader/primitive_array.rs [103:277]


    fn consume_batch(&mut self) -> Result<ArrayRef> {
        let target_type = &self.data_type;
        let arrow_data_type = match T::get_physical_type() {
            PhysicalType::BOOLEAN => ArrowType::Boolean,
            PhysicalType::INT32 => {
                match target_type {
                    ArrowType::UInt32 => {
                        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
                        ArrowType::UInt32
                    }
                    _ => ArrowType::Int32,
                }
            }
            PhysicalType::INT64 => {
                match target_type {
                    ArrowType::UInt64 => {
                        // follow C++ implementation and use overflow/reinterpret cast from  i64 to u64 which will map
                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
                        ArrowType::UInt64
                    }
                    _ => ArrowType::Int64,
                }
            }
            PhysicalType::FLOAT => ArrowType::Float32,
            PhysicalType::DOUBLE => ArrowType::Float64,
            PhysicalType::INT96 => match target_type {
                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
                _ => unreachable!("INT96 must be timestamp nanosecond"),
            },
            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
                unreachable!(
                    "PrimitiveArrayReaders don't support complex physical types"
                );
            }
        };

        // Convert to arrays by using the Parquet physical type.
        // The physical types are then cast to Arrow types if necessary

        let record_data = self.record_reader.consume_record_data();
        let record_data = match T::get_physical_type() {
            PhysicalType::BOOLEAN => {
                let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len());

                for e in record_data.as_slice() {
                    boolean_buffer.append(*e > 0);
                }
                boolean_buffer.into()
            }
            PhysicalType::INT96 => {
                // SAFETY - record_data is an aligned buffer of Int96
                let (prefix, slice, suffix) =
                    unsafe { record_data.as_slice().align_to::<Int96>() };
                assert!(prefix.is_empty() && suffix.is_empty());

                let mut builder = TimestampNanosecondBufferBuilder::new(slice.len());
                for v in slice {
                    builder.append(v.to_nanos())
                }

                builder.finish()
            }
            _ => record_data,
        };

        let array_data = ArrayDataBuilder::new(arrow_data_type)
            .len(self.record_reader.num_values())
            .add_buffer(record_data)
            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());

        let array_data = unsafe { array_data.build_unchecked() };
        let array: ArrayRef = match T::get_physical_type() {
            PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
            PhysicalType::INT32 => match array_data.data_type() {
                ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
                ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
                _ => unreachable!(),
            },
            PhysicalType::INT64 => match array_data.data_type() {
                ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
                ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
                _ => unreachable!(),
            },
            PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
            PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
            PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)),
            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
                unreachable!(
                    "PrimitiveArrayReaders don't support complex physical types"
                );
            }
        };

        // cast to Arrow type
        // We make a strong assumption here that the casts should be infallible.
        // If the cast fails because of incompatible datatypes, then there might
        // be a bigger problem with how Arrow schemas are converted to Parquet.
        //
        // As there is not always a 1:1 mapping between Arrow and Parquet, there
        // are datatypes which we must convert explicitly.
        // These are:
        // - date64: we should cast int32 to date32, then date32 to date64.
        // - decimal: cast in32 to decimal, int64 to decimal
        let array = match target_type {
            ArrowType::Date64 => {
                // this is cheap as it internally reinterprets the data
                let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
                arrow_cast::cast(&a, target_type)?
            }
            ArrowType::Decimal128(p, s) => {
                let array = match array.data_type() {
                    ArrowType::Int32 => array
                        .as_any()
                        .downcast_ref::<Int32Array>()
                        .unwrap()
                        .iter()
                        .map(|v| v.map(|v| v as i128))
                        .collect::<Decimal128Array>(),

                    ArrowType::Int64 => array
                        .as_any()
                        .downcast_ref::<Int64Array>()
                        .unwrap()
                        .iter()
                        .map(|v| v.map(|v| v as i128))
                        .collect::<Decimal128Array>(),
                    _ => {
                        return Err(arrow_err!(
                            "Cannot convert {:?} to decimal",
                            array.data_type()
                        ));
                    }
                }
                .with_precision_and_scale(*p, *s)?;

                Arc::new(array) as ArrayRef
            }
            ArrowType::Decimal256(p, s) => {
                let array = match array.data_type() {
                    ArrowType::Int32 => array
                        .as_any()
                        .downcast_ref::<Int32Array>()
                        .unwrap()
                        .iter()
                        .map(|v| v.map(|v| i256::from_i128(v as i128)))
                        .collect::<Decimal256Array>(),

                    ArrowType::Int64 => array
                        .as_any()
                        .downcast_ref::<Int64Array>()
                        .unwrap()
                        .iter()
                        .map(|v| v.map(|v| i256::from_i128(v as i128)))
                        .collect::<Decimal256Array>(),
                    _ => {
                        return Err(arrow_err!(
                            "Cannot convert {:?} to decimal",
                            array.data_type()
                        ));
                    }
                }
                .with_precision_and_scale(*p, *s)?;

                Arc::new(array) as ArrayRef
            }
            _ => arrow_cast::cast(&array, target_type)?,
        };

        // save definition and repetition buffers
        self.def_levels_buffer = self.record_reader.consume_def_levels();
        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
        self.record_reader.reset();
        Ok(array)
    }