fn arrow_to_parquet_type()

in parquet/src/arrow/schema/mod.rs [291:544]


fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
    let name = field.name().as_str();
    let repetition = if field.is_nullable() {
        Repetition::OPTIONAL
    } else {
        Repetition::REQUIRED
    };
    // create type from field
    match field.data_type() {
        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Unknown))
            .with_repetition(repetition)
            .build(),
        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
            .with_repetition(repetition)
            .build(),
        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 8,
                is_signed: true,
            }))
            .with_repetition(repetition)
            .build(),
        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 16,
                is_signed: true,
            }))
            .with_repetition(repetition)
            .build(),
        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_repetition(repetition)
            .build(),
        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
            .with_repetition(repetition)
            .build(),
        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 8,
                is_signed: false,
            }))
            .with_repetition(repetition)
            .build(),
        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 16,
                is_signed: false,
            }))
            .with_repetition(repetition)
            .build(),
        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 32,
                is_signed: false,
            }))
            .with_repetition(repetition)
            .build(),
        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
            .with_logical_type(Some(LogicalType::Integer {
                bit_width: 64,
                is_signed: false,
            }))
            .with_repetition(repetition)
            .build(),
        DataType::Float16 => Err(arrow_err!("Float16 arrays not supported")),
        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
            .with_repetition(repetition)
            .build(),
        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
            .with_repetition(repetition)
            .build(),
        DataType::Timestamp(TimeUnit::Second, _) => {
            // Cannot represent seconds in LogicalType
            Type::primitive_type_builder(name, PhysicalType::INT64)
                .with_repetition(repetition)
                .build()
        }
        DataType::Timestamp(time_unit, tz) => {
            Type::primitive_type_builder(name, PhysicalType::INT64)
                .with_logical_type(Some(LogicalType::Timestamp {
                    // If timezone set, values are normalized to UTC timezone
                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
                    unit: match time_unit {
                        TimeUnit::Second => unreachable!(),
                        TimeUnit::Millisecond => {
                            ParquetTimeUnit::MILLIS(Default::default())
                        }
                        TimeUnit::Microsecond => {
                            ParquetTimeUnit::MICROS(Default::default())
                        }
                        TimeUnit::Nanosecond => {
                            ParquetTimeUnit::NANOS(Default::default())
                        }
                    },
                }))
                .with_repetition(repetition)
                .build()
        }
        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Date))
            .with_repetition(repetition)
            .build(),
        // date64 is cast to date32 (#1666)
        DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Date))
            .with_repetition(repetition)
            .build(),
        DataType::Time32(TimeUnit::Second) => {
            // Cannot represent seconds in LogicalType
            Type::primitive_type_builder(name, PhysicalType::INT32)
                .with_repetition(repetition)
                .build()
        }
        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
            .with_logical_type(Some(LogicalType::Time {
                is_adjusted_to_u_t_c: false,
                unit: match unit {
                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
                    u => unreachable!("Invalid unit for Time32: {:?}", u),
                },
            }))
            .with_repetition(repetition)
            .build(),
        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
            .with_logical_type(Some(LogicalType::Time {
                is_adjusted_to_u_t_c: false,
                unit: match unit {
                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
                    u => unreachable!("Invalid unit for Time64: {:?}", u),
                },
            }))
            .with_repetition(repetition)
            .build(),
        DataType::Duration(_) => {
            Err(arrow_err!("Converting Duration to parquet not supported",))
        }
        DataType::Interval(_) => {
            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
                .with_converted_type(ConvertedType::INTERVAL)
                .with_repetition(repetition)
                .with_length(12)
                .build()
        }
        DataType::Binary | DataType::LargeBinary => {
            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
                .with_repetition(repetition)
                .build()
        }
        DataType::FixedSizeBinary(length) => {
            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
                .with_repetition(repetition)
                .with_length(*length)
                .build()
        }
        DataType::Decimal128(precision, scale)
        | DataType::Decimal256(precision, scale) => {
            // Decimal precision determines the Parquet physical type to use.
            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
                (PhysicalType::INT32, -1)
            } else if *precision <= 18 {
                (PhysicalType::INT64, -1)
            } else {
                (
                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
                    decimal_length_from_precision(*precision) as i32,
                )
            };
            Type::primitive_type_builder(name, physical_type)
                .with_repetition(repetition)
                .with_length(length)
                .with_logical_type(Some(LogicalType::Decimal {
                    scale: *scale as i32,
                    precision: *precision as i32,
                }))
                .with_precision(*precision as i32)
                .with_scale(*scale as i32)
                .build()
        }
        DataType::Utf8 | DataType::LargeUtf8 => {
            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
                .with_logical_type(Some(LogicalType::String))
                .with_repetition(repetition)
                .build()
        }
        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
            Type::group_type_builder(name)
                .with_fields(&mut vec![Arc::new(
                    Type::group_type_builder("list")
                        .with_fields(&mut vec![Arc::new(arrow_to_parquet_type(f)?)])
                        .with_repetition(Repetition::REPEATED)
                        .build()?,
                )])
                .with_logical_type(Some(LogicalType::List))
                .with_repetition(repetition)
                .build()
        }
        DataType::Struct(fields) => {
            if fields.is_empty() {
                return Err(
                    arrow_err!("Parquet does not support writing empty structs",),
                );
            }
            // recursively convert children to types/nodes
            let fields: Result<Vec<TypePtr>> = fields
                .iter()
                .map(|f| arrow_to_parquet_type(f).map(Arc::new))
                .collect();
            Type::group_type_builder(name)
                .with_fields(&mut fields?)
                .with_repetition(repetition)
                .build()
        }
        DataType::Map(field, _) => {
            if let DataType::Struct(struct_fields) = field.data_type() {
                Type::group_type_builder(name)
                    .with_fields(&mut vec![Arc::new(
                        Type::group_type_builder(field.name())
                            .with_fields(&mut vec![
                                Arc::new(arrow_to_parquet_type(&Field::new(
                                    struct_fields[0].name(),
                                    struct_fields[0].data_type().clone(),
                                    false,
                                ))?),
                                Arc::new(arrow_to_parquet_type(&Field::new(
                                    struct_fields[1].name(),
                                    struct_fields[1].data_type().clone(),
                                    struct_fields[1].is_nullable(),
                                ))?),
                            ])
                            .with_repetition(Repetition::REPEATED)
                            .build()?,
                    )])
                    .with_logical_type(Some(LogicalType::Map))
                    .with_repetition(repetition)
                    .build()
            } else {
                Err(arrow_err!(
                    "DataType::Map should contain a struct field child",
                ))
            }
        }
        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
        DataType::Dictionary(_, ref value) => {
            // Dictionary encoding not handled at the schema level
            let dict_field = Field::new(name, *value.clone(), field.is_nullable());
            arrow_to_parquet_type(&dict_field)
        }
        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
            "Converting RunEndEncodedType to parquet not supported",
        )),
    }
}