in parquet/src/arrow/array_reader/primitive_array.rs [103:277]
fn consume_batch(&mut self) -> Result<ArrayRef> {
let target_type = &self.data_type;
let arrow_data_type = match T::get_physical_type() {
PhysicalType::BOOLEAN => ArrowType::Boolean,
PhysicalType::INT32 => {
match target_type {
ArrowType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map
// `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
ArrowType::UInt32
}
_ => ArrowType::Int32,
}
}
PhysicalType::INT64 => {
match target_type {
ArrowType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map
// `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
ArrowType::UInt64
}
_ => ArrowType::Int64,
}
}
PhysicalType::FLOAT => ArrowType::Float32,
PhysicalType::DOUBLE => ArrowType::Float64,
PhysicalType::INT96 => match target_type {
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
_ => unreachable!("INT96 must be timestamp nanosecond"),
},
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!(
"PrimitiveArrayReaders don't support complex physical types"
);
}
};
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary
let record_data = self.record_reader.consume_record_data();
let record_data = match T::get_physical_type() {
PhysicalType::BOOLEAN => {
let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len());
for e in record_data.as_slice() {
boolean_buffer.append(*e > 0);
}
boolean_buffer.into()
}
PhysicalType::INT96 => {
// SAFETY - record_data is an aligned buffer of Int96
let (prefix, slice, suffix) =
unsafe { record_data.as_slice().align_to::<Int96>() };
assert!(prefix.is_empty() && suffix.is_empty());
let mut builder = TimestampNanosecondBufferBuilder::new(slice.len());
for v in slice {
builder.append(v.to_nanos())
}
builder.finish()
}
_ => record_data,
};
let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
.add_buffer(record_data)
.null_bit_buffer(self.record_reader.consume_bitmap_buffer());
let array_data = unsafe { array_data.build_unchecked() };
let array: ArrayRef = match T::get_physical_type() {
PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
PhysicalType::INT32 => match array_data.data_type() {
ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
_ => unreachable!(),
},
PhysicalType::INT64 => match array_data.data_type() {
ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
_ => unreachable!(),
},
PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)),
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!(
"PrimitiveArrayReaders don't support complex physical types"
);
}
};
// cast to Arrow type
// We make a strong assumption here that the casts should be infallible.
// If the cast fails because of incompatible datatypes, then there might
// be a bigger problem with how Arrow schemas are converted to Parquet.
//
// As there is not always a 1:1 mapping between Arrow and Parquet, there
// are datatypes which we must convert explicitly.
// These are:
// - date64: we should cast int32 to date32, then date32 to date64.
// - decimal: cast in32 to decimal, int64 to decimal
let array = match target_type {
ArrowType::Date64 => {
// this is cheap as it internally reinterprets the data
let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| v as i128))
.collect::<Decimal128Array>(),
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| v as i128))
.collect::<Decimal128Array>(),
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
array.data_type()
));
}
}
.with_precision_and_scale(*p, *s)?;
Arc::new(array) as ArrayRef
}
ArrowType::Decimal256(p, s) => {
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| i256::from_i128(v as i128)))
.collect::<Decimal256Array>(),
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| i256::from_i128(v as i128)))
.collect::<Decimal256Array>(),
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
array.data_type()
));
}
}
.with_precision_and_scale(*p, *s)?;
Arc::new(array) as ArrayRef
}
_ => arrow_cast::cast(&array, target_type)?,
};
// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();
Ok(array)
}