in parquet/src/arrow/arrow_writer/mod.rs [1006:1208]
fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
let column = levels.array().as_ref();
let indices = levels.non_null_indices();
match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
match column.data_type() {
ArrowDataType::Date64 => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
let array = array.as_primitive::<Int32Type>();
write_primitive(typed, array.values(), levels)
}
ArrowDataType::UInt32 => {
let values = column.as_primitive::<UInt32Type>().values();
// follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map
// `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
let array = values.inner().typed_data::<i32>();
write_primitive(typed, array, levels)
}
ArrowDataType::Decimal128(_, _) => {
// use the int32 to represent the decimal with low precision
let array = column
.as_primitive::<Decimal128Type>()
.unary::<_, Int32Type>(|v| v as i32);
write_primitive(typed, array.values(), levels)
}
ArrowDataType::Decimal256(_, _) => {
// use the int32 to represent the decimal with low precision
let array = column
.as_primitive::<Decimal256Type>()
.unary::<_, Int32Type>(|v| v.as_i128() as i32);
write_primitive(typed, array.values(), levels)
}
ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
ArrowDataType::Decimal128(_, _) => {
let array = arrow_cast::cast(column, value_type)?;
let array = array
.as_primitive::<Decimal128Type>()
.unary::<_, Int32Type>(|v| v as i32);
write_primitive(typed, array.values(), levels)
}
ArrowDataType::Decimal256(_, _) => {
let array = arrow_cast::cast(column, value_type)?;
let array = array
.as_primitive::<Decimal256Type>()
.unary::<_, Int32Type>(|v| v.as_i128() as i32);
write_primitive(typed, array.values(), levels)
}
_ => {
let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
let array = array.as_primitive::<Int32Type>();
write_primitive(typed, array.values(), levels)
}
},
_ => {
let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
let array = array.as_primitive::<Int32Type>();
write_primitive(typed, array.values(), levels)
}
}
}
ColumnWriter::BoolColumnWriter(ref mut typed) => {
let array = column.as_boolean();
typed.write_batch(
get_bool_array_slice(array, indices).as_slice(),
levels.def_levels(),
levels.rep_levels(),
)
}
ColumnWriter::Int64ColumnWriter(ref mut typed) => {
match column.data_type() {
ArrowDataType::Date64 => {
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
let array = array.as_primitive::<Int64Type>();
write_primitive(typed, array.values(), levels)
}
ArrowDataType::Int64 => {
let array = column.as_primitive::<Int64Type>();
write_primitive(typed, array.values(), levels)
}
ArrowDataType::UInt64 => {
let values = column.as_primitive::<UInt64Type>().values();
// follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map
// `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
let array = values.inner().typed_data::<i64>();
write_primitive(typed, array, levels)
}
ArrowDataType::Decimal128(_, _) => {
// use the int64 to represent the decimal with low precision
let array = column
.as_primitive::<Decimal128Type>()
.unary::<_, Int64Type>(|v| v as i64);
write_primitive(typed, array.values(), levels)
}
ArrowDataType::Decimal256(_, _) => {
// use the int64 to represent the decimal with low precision
let array = column
.as_primitive::<Decimal256Type>()
.unary::<_, Int64Type>(|v| v.as_i128() as i64);
write_primitive(typed, array.values(), levels)
}
ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
ArrowDataType::Decimal128(_, _) => {
let array = arrow_cast::cast(column, value_type)?;
let array = array
.as_primitive::<Decimal128Type>()
.unary::<_, Int64Type>(|v| v as i64);
write_primitive(typed, array.values(), levels)
}
ArrowDataType::Decimal256(_, _) => {
let array = arrow_cast::cast(column, value_type)?;
let array = array
.as_primitive::<Decimal256Type>()
.unary::<_, Int64Type>(|v| v.as_i128() as i64);
write_primitive(typed, array.values(), levels)
}
_ => {
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
let array = array.as_primitive::<Int64Type>();
write_primitive(typed, array.values(), levels)
}
},
_ => {
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
let array = array.as_primitive::<Int64Type>();
write_primitive(typed, array.values(), levels)
}
}
}
ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
unreachable!("Currently unreachable because data type not supported")
}
ColumnWriter::FloatColumnWriter(ref mut typed) => {
let array = column.as_primitive::<Float32Type>();
write_primitive(typed, array.values(), levels)
}
ColumnWriter::DoubleColumnWriter(ref mut typed) => {
let array = column.as_primitive::<Float64Type>();
write_primitive(typed, array.values(), levels)
}
ColumnWriter::ByteArrayColumnWriter(_) => {
unreachable!("should use ByteArrayWriter")
}
ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
let bytes = match column.data_type() {
ArrowDataType::Interval(interval_unit) => match interval_unit {
IntervalUnit::YearMonth => {
let array = column
.as_any()
.downcast_ref::<arrow_array::IntervalYearMonthArray>()
.unwrap();
get_interval_ym_array_slice(array, indices)
}
IntervalUnit::DayTime => {
let array = column
.as_any()
.downcast_ref::<arrow_array::IntervalDayTimeArray>()
.unwrap();
get_interval_dt_array_slice(array, indices)
}
_ => {
return Err(ParquetError::NYI(
format!(
"Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
)
));
}
},
ArrowDataType::FixedSizeBinary(_) => {
let array = column
.as_any()
.downcast_ref::<arrow_array::FixedSizeBinaryArray>()
.unwrap();
get_fsb_array_slice(array, indices)
}
ArrowDataType::Decimal128(_, _) => {
let array = column.as_primitive::<Decimal128Type>();
get_decimal_128_array_slice(array, indices)
}
ArrowDataType::Decimal256(_, _) => {
let array = column
.as_any()
.downcast_ref::<arrow_array::Decimal256Array>()
.unwrap();
get_decimal_256_array_slice(array, indices)
}
ArrowDataType::Float16 => {
let array = column.as_primitive::<Float16Type>();
get_float_16_array_slice(array, indices)
}
_ => {
return Err(ParquetError::NYI(
"Attempting to write an Arrow type that is not yet implemented".to_string(),
));
}
};
typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
}
}
}