in parquet/src/arrow/schema/mod.rs [291:544]
fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
let name = field.name().as_str();
let repetition = if field.is_nullable() {
Repetition::OPTIONAL
} else {
Repetition::REQUIRED
};
// create type from field
match field.data_type() {
DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Unknown))
.with_repetition(repetition)
.build(),
DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
.with_repetition(repetition)
.build(),
DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 8,
is_signed: true,
}))
.with_repetition(repetition)
.build(),
DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 16,
is_signed: true,
}))
.with_repetition(repetition)
.build(),
DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_repetition(repetition)
.build(),
DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_repetition(repetition)
.build(),
DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 8,
is_signed: false,
}))
.with_repetition(repetition)
.build(),
DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 16,
is_signed: false,
}))
.with_repetition(repetition)
.build(),
DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 32,
is_signed: false,
}))
.with_repetition(repetition)
.build(),
DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 64,
is_signed: false,
}))
.with_repetition(repetition)
.build(),
DataType::Float16 => Err(arrow_err!("Float16 arrays not supported")),
DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
.with_repetition(repetition)
.build(),
DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
.with_repetition(repetition)
.build(),
DataType::Timestamp(TimeUnit::Second, _) => {
// Cannot represent seconds in LogicalType
Type::primitive_type_builder(name, PhysicalType::INT64)
.with_repetition(repetition)
.build()
}
DataType::Timestamp(time_unit, tz) => {
Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Timestamp {
// If timezone set, values are normalized to UTC timezone
is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
unit: match time_unit {
TimeUnit::Second => unreachable!(),
TimeUnit::Millisecond => {
ParquetTimeUnit::MILLIS(Default::default())
}
TimeUnit::Microsecond => {
ParquetTimeUnit::MICROS(Default::default())
}
TimeUnit::Nanosecond => {
ParquetTimeUnit::NANOS(Default::default())
}
},
}))
.with_repetition(repetition)
.build()
}
DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.build(),
// date64 is cast to date32 (#1666)
DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.build(),
DataType::Time32(TimeUnit::Second) => {
// Cannot represent seconds in LogicalType
Type::primitive_type_builder(name, PhysicalType::INT32)
.with_repetition(repetition)
.build()
}
DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: false,
unit: match unit {
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
u => unreachable!("Invalid unit for Time32: {:?}", u),
},
}))
.with_repetition(repetition)
.build(),
DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: false,
unit: match unit {
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
u => unreachable!("Invalid unit for Time64: {:?}", u),
},
}))
.with_repetition(repetition)
.build(),
DataType::Duration(_) => {
Err(arrow_err!("Converting Duration to parquet not supported",))
}
DataType::Interval(_) => {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_converted_type(ConvertedType::INTERVAL)
.with_repetition(repetition)
.with_length(12)
.build()
}
DataType::Binary | DataType::LargeBinary => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_repetition(repetition)
.build()
}
DataType::FixedSizeBinary(length) => {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(repetition)
.with_length(*length)
.build()
}
DataType::Decimal128(precision, scale)
| DataType::Decimal256(precision, scale) => {
// Decimal precision determines the Parquet physical type to use.
// Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
let (physical_type, length) = if *precision > 1 && *precision <= 9 {
(PhysicalType::INT32, -1)
} else if *precision <= 18 {
(PhysicalType::INT64, -1)
} else {
(
PhysicalType::FIXED_LEN_BYTE_ARRAY,
decimal_length_from_precision(*precision) as i32,
)
};
Type::primitive_type_builder(name, physical_type)
.with_repetition(repetition)
.with_length(length)
.with_logical_type(Some(LogicalType::Decimal {
scale: *scale as i32,
precision: *precision as i32,
}))
.with_precision(*precision as i32)
.with_scale(*scale as i32)
.build()
}
DataType::Utf8 | DataType::LargeUtf8 => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type(Some(LogicalType::String))
.with_repetition(repetition)
.build()
}
DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
Type::group_type_builder(name)
.with_fields(&mut vec![Arc::new(
Type::group_type_builder("list")
.with_fields(&mut vec![Arc::new(arrow_to_parquet_type(f)?)])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
.with_logical_type(Some(LogicalType::List))
.with_repetition(repetition)
.build()
}
DataType::Struct(fields) => {
if fields.is_empty() {
return Err(
arrow_err!("Parquet does not support writing empty structs",),
);
}
// recursively convert children to types/nodes
let fields: Result<Vec<TypePtr>> = fields
.iter()
.map(|f| arrow_to_parquet_type(f).map(Arc::new))
.collect();
Type::group_type_builder(name)
.with_fields(&mut fields?)
.with_repetition(repetition)
.build()
}
DataType::Map(field, _) => {
if let DataType::Struct(struct_fields) = field.data_type() {
Type::group_type_builder(name)
.with_fields(&mut vec![Arc::new(
Type::group_type_builder(field.name())
.with_fields(&mut vec![
Arc::new(arrow_to_parquet_type(&Field::new(
struct_fields[0].name(),
struct_fields[0].data_type().clone(),
false,
))?),
Arc::new(arrow_to_parquet_type(&Field::new(
struct_fields[1].name(),
struct_fields[1].data_type().clone(),
struct_fields[1].is_nullable(),
))?),
])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
.with_logical_type(Some(LogicalType::Map))
.with_repetition(repetition)
.build()
} else {
Err(arrow_err!(
"DataType::Map should contain a struct field child",
))
}
}
DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
DataType::Dictionary(_, ref value) => {
// Dictionary encoding not handled at the schema level
let dict_field = Field::new(name, *value.clone(), field.is_nullable());
arrow_to_parquet_type(&dict_field)
}
DataType::RunEndEncoded(_, _) => Err(arrow_err!(
"Converting RunEndEncodedType to parquet not supported",
)),
}
}