in parquet/src/arrow/array_reader/primitive_array.rs [166:384]
fn consume_batch(&mut self) -> Result<ArrayRef> {
let target_type = &self.data_type;
let arrow_data_type = match T::get_physical_type() {
PhysicalType::BOOLEAN => ArrowType::Boolean,
PhysicalType::INT32 => {
match target_type {
ArrowType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map
// `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
ArrowType::UInt32
}
_ => ArrowType::Int32,
}
}
PhysicalType::INT64 => {
match target_type {
ArrowType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map
// `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
ArrowType::UInt64
}
_ => ArrowType::Int64,
}
}
PhysicalType::FLOAT => ArrowType::Float32,
PhysicalType::DOUBLE => ArrowType::Float64,
PhysicalType::INT96 => match target_type {
ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
_ => unreachable!("INT96 must be a timestamp."),
},
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!("PrimitiveArrayReaders don't support complex physical types");
}
};
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary
let record_data = self
.record_reader
.consume_record_data()
.into_buffer(target_type);
let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
.add_buffer(record_data)
.null_bit_buffer(self.record_reader.consume_bitmap_buffer());
let array_data = unsafe { array_data.build_unchecked() };
let array: ArrayRef = match T::get_physical_type() {
PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
PhysicalType::INT32 => match array_data.data_type() {
ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
_ => unreachable!(),
},
PhysicalType::INT64 => match array_data.data_type() {
ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
_ => unreachable!(),
},
PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
PhysicalType::INT96 => match target_type {
ArrowType::Timestamp(TimeUnit::Second, _) => {
Arc::new(TimestampSecondArray::from(array_data))
}
ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
Arc::new(TimestampMillisecondArray::from(array_data))
}
ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
Arc::new(TimestampMicrosecondArray::from(array_data))
}
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
Arc::new(TimestampNanosecondArray::from(array_data))
}
_ => unreachable!("INT96 must be a timestamp."),
},
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!("PrimitiveArrayReaders don't support complex physical types");
}
};
// cast to Arrow type
// We make a strong assumption here that the casts should be infallible.
// If the cast fails because of incompatible datatypes, then there might
// be a bigger problem with how Arrow schemas are converted to Parquet.
//
// As there is not always a 1:1 mapping between Arrow and Parquet, there
// are datatypes which we must convert explicitly.
// These are:
// - date64: cast int32 to date32, then date32 to date64.
// - decimal: cast int32 to decimal, int64 to decimal
let array = match target_type {
ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
// this is cheap as it internally reinterprets the data
let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
// Apply conversion to all elements regardless of null slots as the conversion
// to `i128` is infallible. This improves performance by avoiding a branch in
// the inner loop (see docs for `PrimitiveArray::unary`).
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
array.data_type()
));
}
}
.with_precision_and_scale(*p, *s)?;
Arc::new(array) as ArrayRef
}
ArrowType::Decimal256(p, s) => {
// See above comment. Conversion to `i256` is likewise infallible.
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|i| i256::from_i128(i as i128))
as Decimal256Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(|i| i256::from_i128(i as i128))
as Decimal256Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
array.data_type()
));
}
}
.with_precision_and_scale(*p, *s)?;
Arc::new(array) as ArrayRef
}
ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
ArrowType::Decimal128(p, s) => {
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal dictionary",
array.data_type()
));
}
}
.with_precision_and_scale(*p, *s)?;
arrow_cast::cast(&array, target_type)?
}
ArrowType::Decimal256(p, s) => {
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(i256::from)
as Decimal256Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(i256::from)
as Decimal256Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal dictionary",
array.data_type()
));
}
}
.with_precision_and_scale(*p, *s)?;
arrow_cast::cast(&array, target_type)?
}
_ => arrow_cast::cast(&array, target_type)?,
},
_ => arrow_cast::cast(&array, target_type)?,
};
// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();
Ok(array)
}