fn consume_batch()

in parquet/src/arrow/array_reader/primitive_array.rs [166:384]


    fn consume_batch(&mut self) -> Result<ArrayRef> {
        let target_type = &self.data_type;
        let arrow_data_type = match T::get_physical_type() {
            PhysicalType::BOOLEAN => ArrowType::Boolean,
            PhysicalType::INT32 => {
                match target_type {
                    ArrowType::UInt32 => {
                        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
                        ArrowType::UInt32
                    }
                    _ => ArrowType::Int32,
                }
            }
            PhysicalType::INT64 => {
                match target_type {
                    ArrowType::UInt64 => {
                        // follow C++ implementation and use overflow/reinterpret cast from  i64 to u64 which will map
                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
                        ArrowType::UInt64
                    }
                    _ => ArrowType::Int64,
                }
            }
            PhysicalType::FLOAT => ArrowType::Float32,
            PhysicalType::DOUBLE => ArrowType::Float64,
            PhysicalType::INT96 => match target_type {
                ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
                ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
                ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
                _ => unreachable!("INT96 must be a timestamp."),
            },
            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
                unreachable!("PrimitiveArrayReaders don't support complex physical types");
            }
        };

        // Convert to arrays by using the Parquet physical type.
        // The physical types are then cast to Arrow types if necessary

        let record_data = self
            .record_reader
            .consume_record_data()
            .into_buffer(target_type);

        let array_data = ArrayDataBuilder::new(arrow_data_type)
            .len(self.record_reader.num_values())
            .add_buffer(record_data)
            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());

        let array_data = unsafe { array_data.build_unchecked() };
        let array: ArrayRef = match T::get_physical_type() {
            PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
            PhysicalType::INT32 => match array_data.data_type() {
                ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
                ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
                _ => unreachable!(),
            },
            PhysicalType::INT64 => match array_data.data_type() {
                ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
                ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
                _ => unreachable!(),
            },
            PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
            PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
            PhysicalType::INT96 => match target_type {
                ArrowType::Timestamp(TimeUnit::Second, _) => {
                    Arc::new(TimestampSecondArray::from(array_data))
                }
                ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
                    Arc::new(TimestampMillisecondArray::from(array_data))
                }
                ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
                    Arc::new(TimestampMicrosecondArray::from(array_data))
                }
                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
                    Arc::new(TimestampNanosecondArray::from(array_data))
                }
                _ => unreachable!("INT96 must be a timestamp."),
            },

            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
                unreachable!("PrimitiveArrayReaders don't support complex physical types");
            }
        };

        // cast to Arrow type
        // We make a strong assumption here that the casts should be infallible.
        // If the cast fails because of incompatible datatypes, then there might
        // be a bigger problem with how Arrow schemas are converted to Parquet.
        //
        // As there is not always a 1:1 mapping between Arrow and Parquet, there
        // are datatypes which we must convert explicitly.
        // These are:
        // - date64: cast int32 to date32, then date32 to date64.
        // - decimal: cast int32 to decimal, int64 to decimal
        let array = match target_type {
            ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
                // this is cheap as it internally reinterprets the data
                let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
                arrow_cast::cast(&a, target_type)?
            }
            ArrowType::Decimal128(p, s) => {
                // Apply conversion to all elements regardless of null slots as the conversion
                // to `i128` is infallible. This improves performance by avoiding a branch in
                // the inner loop (see docs for `PrimitiveArray::unary`).
                let array = match array.data_type() {
                    ArrowType::Int32 => array
                        .as_any()
                        .downcast_ref::<Int32Array>()
                        .unwrap()
                        .unary(|i| i as i128)
                        as Decimal128Array,
                    ArrowType::Int64 => array
                        .as_any()
                        .downcast_ref::<Int64Array>()
                        .unwrap()
                        .unary(|i| i as i128)
                        as Decimal128Array,
                    _ => {
                        return Err(arrow_err!(
                            "Cannot convert {:?} to decimal",
                            array.data_type()
                        ));
                    }
                }
                .with_precision_and_scale(*p, *s)?;

                Arc::new(array) as ArrayRef
            }
            ArrowType::Decimal256(p, s) => {
                // See above comment. Conversion to `i256` is likewise infallible.
                let array = match array.data_type() {
                    ArrowType::Int32 => array
                        .as_any()
                        .downcast_ref::<Int32Array>()
                        .unwrap()
                        .unary(|i| i256::from_i128(i as i128))
                        as Decimal256Array,
                    ArrowType::Int64 => array
                        .as_any()
                        .downcast_ref::<Int64Array>()
                        .unwrap()
                        .unary(|i| i256::from_i128(i as i128))
                        as Decimal256Array,
                    _ => {
                        return Err(arrow_err!(
                            "Cannot convert {:?} to decimal",
                            array.data_type()
                        ));
                    }
                }
                .with_precision_and_scale(*p, *s)?;

                Arc::new(array) as ArrayRef
            }
            ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
                ArrowType::Decimal128(p, s) => {
                    let array = match array.data_type() {
                        ArrowType::Int32 => array
                            .as_any()
                            .downcast_ref::<Int32Array>()
                            .unwrap()
                            .unary(|i| i as i128)
                            as Decimal128Array,
                        ArrowType::Int64 => array
                            .as_any()
                            .downcast_ref::<Int64Array>()
                            .unwrap()
                            .unary(|i| i as i128)
                            as Decimal128Array,
                        _ => {
                            return Err(arrow_err!(
                                "Cannot convert {:?} to decimal dictionary",
                                array.data_type()
                            ));
                        }
                    }
                    .with_precision_and_scale(*p, *s)?;

                    arrow_cast::cast(&array, target_type)?
                }
                ArrowType::Decimal256(p, s) => {
                    let array = match array.data_type() {
                        ArrowType::Int32 => array
                            .as_any()
                            .downcast_ref::<Int32Array>()
                            .unwrap()
                            .unary(i256::from)
                            as Decimal256Array,
                        ArrowType::Int64 => array
                            .as_any()
                            .downcast_ref::<Int64Array>()
                            .unwrap()
                            .unary(i256::from)
                            as Decimal256Array,
                        _ => {
                            return Err(arrow_err!(
                                "Cannot convert {:?} to decimal dictionary",
                                array.data_type()
                            ));
                        }
                    }
                    .with_precision_and_scale(*p, *s)?;

                    arrow_cast::cast(&array, target_type)?
                }
                _ => arrow_cast::cast(&array, target_type)?,
            },
            _ => arrow_cast::cast(&array, target_type)?,
        };

        // save definition and repetition buffers
        self.def_levels_buffer = self.record_reader.consume_def_levels();
        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
        self.record_reader.reset();
        Ok(array)
    }