fn arrow_to_parquet_type()

in parquet/src/arrow/schema/mod.rs [423:771]


fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
    const PARQUET_KEY_FIELD_NAME: &str = "key";
    const PARQUET_VALUE_FIELD_NAME: &str = "value";

    let name = field.name().as_str();
    let repetition = if field.is_nullable() {
        Repetition::OPTIONAL
    } else {
        Repetition::REQUIRED
    };
    let id = field_id(field);
    // create type from field
    match field.data_type() {
        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Unknown))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 8,
                is_signed: true,
            }))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 16,
                is_signed: true,
            }))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 8,
                is_signed: false,
            }))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 16,
                is_signed: false,
            }))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 32,
                is_signed: false,
            }))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 64,
                is_signed: false,
            }))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
            .with_repetition(repetition)
            .with_id(id)
            .with_logical_type(Some(LogicalType::Float16))
            .with_length(2)
            .build(),
        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Timestamp(TimeUnit::Second, _) => {
            // Cannot represent seconds in LogicalType
            Type::primitive_type_builder(name, PhysicalType::INT64)
                .with_repetition(repetition)
                .with_id(id)
                .build()
        }
        DataType::Timestamp(time_unit, tz) => {
            Type::primitive_type_builder(name, PhysicalType::INT64)
                .with_logical_type(Some(LogicalType::Timestamp {
                    // If timezone set, values are normalized to UTC timezone
                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
                    unit: match time_unit {
                        TimeUnit::Second => unreachable!(),
                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
                    },
                }))
                .with_repetition(repetition)
                .with_id(id)
                .build()
        }
        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Date))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Date64 => {
            if coerce_types {
                Type::primitive_type_builder(name, PhysicalType::INT32)
                    .with_logical_type(Some(LogicalType::Date))
                    .with_repetition(repetition)
                    .with_id(id)
                    .build()
            } else {
                Type::primitive_type_builder(name, PhysicalType::INT64)
                    .with_repetition(repetition)
                    .with_id(id)
                    .build()
            }
        }
        DataType::Time32(TimeUnit::Second) => {
            // Cannot represent seconds in LogicalType
            Type::primitive_type_builder(name, PhysicalType::INT32)
                .with_repetition(repetition)
                .with_id(id)
                .build()
        }
        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Time {
                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
                unit: match unit {
                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
                    u => unreachable!("Invalid unit for Time32: {:?}", u),
                },
            }))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
            .with_logical_type(Some(LogicalType::Time {
                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
                unit: match unit {
                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
                    u => unreachable!("Invalid unit for Time64: {:?}", u),
                },
            }))
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)),
        DataType::Interval(_) => {
            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
                .with_converted_type(ConvertedType::INTERVAL)
                .with_repetition(repetition)
                .with_id(id)
                .with_length(12)
                .build()
        }
        DataType::Binary | DataType::LargeBinary => {
            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
                .with_repetition(repetition)
                .with_id(id)
                .build()
        }
        DataType::FixedSizeBinary(length) => {
            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
                .with_repetition(repetition)
                .with_id(id)
                .with_length(*length)
                .with_logical_type(
                    #[cfg(feature = "arrow_canonical_extension_types")]
                    // If set, map arrow uuid extension type to parquet uuid logical type.
                    field
                        .try_extension_type::<Uuid>()
                        .ok()
                        .map(|_| LogicalType::Uuid),
                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
                    None,
                )
                .build()
        }
        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
            // Decimal precision determines the Parquet physical type to use.
            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
                (PhysicalType::INT32, -1)
            } else if *precision <= 18 {
                (PhysicalType::INT64, -1)
            } else {
                (
                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
                    decimal_length_from_precision(*precision) as i32,
                )
            };
            Type::primitive_type_builder(name, physical_type)
                .with_repetition(repetition)
                .with_id(id)
                .with_length(length)
                .with_logical_type(Some(LogicalType::Decimal {
                    scale: *scale as i32,
                    precision: *precision as i32,
                }))
                .with_precision(*precision as i32)
                .with_scale(*scale as i32)
                .build()
        }
        DataType::Utf8 | DataType::LargeUtf8 => {
            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
                .with_logical_type({
                    #[cfg(feature = "arrow_canonical_extension_types")]
                    {
                        // Use the Json logical type if the canonical Json
                        // extension type is set on this field.
                        field
                            .try_extension_type::<Json>()
                            .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
                    }
                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
                    Some(LogicalType::String)
                })
                .with_repetition(repetition)
                .with_id(id)
                .build()
        }
        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
            .with_logical_type({
                #[cfg(feature = "arrow_canonical_extension_types")]
                {
                    // Use the Json logical type if the canonical Json
                    // extension type is set on this field.
                    field
                        .try_extension_type::<Json>()
                        .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
                }
                #[cfg(not(feature = "arrow_canonical_extension_types"))]
                Some(LogicalType::String)
            })
            .with_repetition(repetition)
            .with_id(id)
            .build(),
        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
                // Ensure proper naming per the Parquet specification
                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
            } else {
                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
            };

            Type::group_type_builder(name)
                .with_fields(vec![Arc::new(
                    Type::group_type_builder("list")
                        .with_fields(vec![field_ref])
                        .with_repetition(Repetition::REPEATED)
                        .build()?,
                )])
                .with_logical_type(Some(LogicalType::List))
                .with_repetition(repetition)
                .with_id(id)
                .build()
        }
        DataType::ListView(_) | DataType::LargeListView(_) => {
            unimplemented!("ListView/LargeListView not implemented")
        }
        DataType::Struct(fields) => {
            if fields.is_empty() {
                return Err(arrow_err!("Parquet does not support writing empty structs",));
            }
            // recursively convert children to types/nodes
            let fields = fields
                .iter()
                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
                .collect::<Result<_>>()?;
            Type::group_type_builder(name)
                .with_fields(fields)
                .with_repetition(repetition)
                .with_id(id)
                .build()
        }
        DataType::Map(field, _) => {
            if let DataType::Struct(struct_fields) = field.data_type() {
                // If coercing then set inner struct name to "key_value"
                let map_struct_name = if coerce_types {
                    PARQUET_MAP_STRUCT_NAME
                } else {
                    field.name()
                };

                // If coercing then ensure struct fields are named "key" and "value"
                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
                    if coerce_types && fld.name() != name {
                        let f = fld.as_ref().clone().with_name(name);
                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
                    } else {
                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
                    }
                };
                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;

                Type::group_type_builder(name)
                    .with_fields(vec![Arc::new(
                        Type::group_type_builder(map_struct_name)
                            .with_fields(vec![key_field, val_field])
                            .with_repetition(Repetition::REPEATED)
                            .build()?,
                    )])
                    .with_logical_type(Some(LogicalType::Map))
                    .with_repetition(repetition)
                    .with_id(id)
                    .build()
            } else {
                Err(arrow_err!(
                    "DataType::Map should contain a struct field child",
                ))
            }
        }
        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
        DataType::Dictionary(_, ref value) => {
            // Dictionary encoding not handled at the schema level
            let dict_field = field.clone().with_data_type(value.as_ref().clone());
            arrow_to_parquet_type(&dict_field, coerce_types)
        }
        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
            "Converting RunEndEncodedType to parquet not supported",
        )),
    }
}