in parquet/src/arrow/schema/mod.rs [423:771]
fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
const PARQUET_LIST_ELEMENT_NAME: &str = "element";
const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
const PARQUET_KEY_FIELD_NAME: &str = "key";
const PARQUET_VALUE_FIELD_NAME: &str = "value";
let name = field.name().as_str();
let repetition = if field.is_nullable() {
Repetition::OPTIONAL
} else {
Repetition::REQUIRED
};
let id = field_id(field);
// create type from field
match field.data_type() {
DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Unknown))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 8,
is_signed: true,
}))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 16,
is_signed: true,
}))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 8,
is_signed: false,
}))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 16,
is_signed: false,
}))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 32,
is_signed: false,
}))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 64,
is_signed: false,
}))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(repetition)
.with_id(id)
.with_logical_type(Some(LogicalType::Float16))
.with_length(2)
.build(),
DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Timestamp(TimeUnit::Second, _) => {
// Cannot represent seconds in LogicalType
Type::primitive_type_builder(name, PhysicalType::INT64)
.with_repetition(repetition)
.with_id(id)
.build()
}
DataType::Timestamp(time_unit, tz) => {
Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Timestamp {
// If timezone set, values are normalized to UTC timezone
is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
unit: match time_unit {
TimeUnit::Second => unreachable!(),
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
},
}))
.with_repetition(repetition)
.with_id(id)
.build()
}
DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Date64 => {
if coerce_types {
Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.with_id(id)
.build()
} else {
Type::primitive_type_builder(name, PhysicalType::INT64)
.with_repetition(repetition)
.with_id(id)
.build()
}
}
DataType::Time32(TimeUnit::Second) => {
// Cannot represent seconds in LogicalType
Type::primitive_type_builder(name, PhysicalType::INT32)
.with_repetition(repetition)
.with_id(id)
.build()
}
DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
unit: match unit {
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
u => unreachable!("Invalid unit for Time32: {:?}", u),
},
}))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
unit: match unit {
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
u => unreachable!("Invalid unit for Time64: {:?}", u),
},
}))
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)),
DataType::Interval(_) => {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_converted_type(ConvertedType::INTERVAL)
.with_repetition(repetition)
.with_id(id)
.with_length(12)
.build()
}
DataType::Binary | DataType::LargeBinary => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_repetition(repetition)
.with_id(id)
.build()
}
DataType::FixedSizeBinary(length) => {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(repetition)
.with_id(id)
.with_length(*length)
.with_logical_type(
#[cfg(feature = "arrow_canonical_extension_types")]
// If set, map arrow uuid extension type to parquet uuid logical type.
field
.try_extension_type::<Uuid>()
.ok()
.map(|_| LogicalType::Uuid),
#[cfg(not(feature = "arrow_canonical_extension_types"))]
None,
)
.build()
}
DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
// Decimal precision determines the Parquet physical type to use.
// Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
let (physical_type, length) = if *precision > 1 && *precision <= 9 {
(PhysicalType::INT32, -1)
} else if *precision <= 18 {
(PhysicalType::INT64, -1)
} else {
(
PhysicalType::FIXED_LEN_BYTE_ARRAY,
decimal_length_from_precision(*precision) as i32,
)
};
Type::primitive_type_builder(name, physical_type)
.with_repetition(repetition)
.with_id(id)
.with_length(length)
.with_logical_type(Some(LogicalType::Decimal {
scale: *scale as i32,
precision: *precision as i32,
}))
.with_precision(*precision as i32)
.with_scale(*scale as i32)
.build()
}
DataType::Utf8 | DataType::LargeUtf8 => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type({
#[cfg(feature = "arrow_canonical_extension_types")]
{
// Use the Json logical type if the canonical Json
// extension type is set on this field.
field
.try_extension_type::<Json>()
.map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
}
#[cfg(not(feature = "arrow_canonical_extension_types"))]
Some(LogicalType::String)
})
.with_repetition(repetition)
.with_id(id)
.build()
}
DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type({
#[cfg(feature = "arrow_canonical_extension_types")]
{
// Use the Json logical type if the canonical Json
// extension type is set on this field.
field
.try_extension_type::<Json>()
.map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
}
#[cfg(not(feature = "arrow_canonical_extension_types"))]
Some(LogicalType::String)
})
.with_repetition(repetition)
.with_id(id)
.build(),
DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
// Ensure proper naming per the Parquet specification
let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
} else {
Arc::new(arrow_to_parquet_type(f, coerce_types)?)
};
Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder("list")
.with_fields(vec![field_ref])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
.with_logical_type(Some(LogicalType::List))
.with_repetition(repetition)
.with_id(id)
.build()
}
DataType::ListView(_) | DataType::LargeListView(_) => {
unimplemented!("ListView/LargeListView not implemented")
}
DataType::Struct(fields) => {
if fields.is_empty() {
return Err(arrow_err!("Parquet does not support writing empty structs",));
}
// recursively convert children to types/nodes
let fields = fields
.iter()
.map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
.collect::<Result<_>>()?;
Type::group_type_builder(name)
.with_fields(fields)
.with_repetition(repetition)
.with_id(id)
.build()
}
DataType::Map(field, _) => {
if let DataType::Struct(struct_fields) = field.data_type() {
// If coercing then set inner struct name to "key_value"
let map_struct_name = if coerce_types {
PARQUET_MAP_STRUCT_NAME
} else {
field.name()
};
// If coercing then ensure struct fields are named "key" and "value"
let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
if coerce_types && fld.name() != name {
let f = fld.as_ref().clone().with_name(name);
Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
} else {
Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
}
};
let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder(map_struct_name)
.with_fields(vec![key_field, val_field])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
.with_logical_type(Some(LogicalType::Map))
.with_repetition(repetition)
.with_id(id)
.build()
} else {
Err(arrow_err!(
"DataType::Map should contain a struct field child",
))
}
}
DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
DataType::Dictionary(_, ref value) => {
// Dictionary encoding not handled at the schema level
let dict_field = field.clone().with_data_type(value.as_ref().clone());
arrow_to_parquet_type(&dict_field, coerce_types)
}
DataType::RunEndEncoded(_, _) => Err(arrow_err!(
"Converting RunEndEncodedType to parquet not supported",
)),
}
}