in crates/core/src/avro_to_arrow/schema.rs [56:155]
fn schema_to_field_with_props(
schema: &AvroSchema,
name: Option<&str>,
nullable: bool,
props: Option<HashMap<String, String>>,
) -> Result<Field> {
let mut nullable = nullable;
let field_type: DataType = match schema {
AvroSchema::Ref { .. } => todo!("Add support for AvroSchema::Ref"),
AvroSchema::Null => DataType::Null,
AvroSchema::Boolean => DataType::Boolean,
AvroSchema::Int => DataType::Int32,
AvroSchema::Long => DataType::Int64,
AvroSchema::Float => DataType::Float32,
AvroSchema::Double => DataType::Float64,
AvroSchema::Bytes => DataType::Binary,
AvroSchema::String => DataType::Utf8,
AvroSchema::Array(item_schema) => DataType::List(Arc::new(schema_to_field_with_props(
&item_schema.items,
Some("element"),
false,
None,
)?)),
AvroSchema::Map(value_schema) => {
let value_field =
schema_to_field_with_props(&value_schema.types, Some("value"), false, None)?;
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(value_field.data_type().clone()),
)
}
AvroSchema::Union(us) => {
// If there are only two variants and one of them is null, set the other type as the field data type
let has_nullable = us
.find_schema_with_known_schemata::<apache_avro::Schema>(&Value::Null, None, &None)
.is_some();
let sub_schemas = us.variants();
if has_nullable && sub_schemas.len() == 2 {
nullable = true;
if let Some(schema) = sub_schemas
.iter()
.find(|&schema| !matches!(schema, AvroSchema::Null))
{
schema_to_field_with_props(schema, None, has_nullable, None)?
.data_type()
.clone()
} else {
return Err(CoreError::from(apache_avro::Error::GetUnionDuplicate));
}
} else {
let fields = sub_schemas
.iter()
.map(|s| schema_to_field_with_props(s, None, has_nullable, None))
.collect::<Result<Vec<Field>>>()?;
let type_ids = 0_i8..fields.len() as i8;
DataType::Union(UnionFields::new(type_ids, fields), UnionMode::Dense)
}
}
AvroSchema::Record(RecordSchema { fields, .. }) => {
let fields: Result<_> = fields
.iter()
.map(|field| {
let mut props = HashMap::new();
if let Some(doc) = &field.doc {
props.insert("avro::doc".to_string(), doc.clone());
}
/*if let Some(aliases) = fields.aliases {
props.insert("aliases", aliases);
}*/
schema_to_field_with_props(&field.schema, Some(&field.name), false, Some(props))
})
.collect();
DataType::Struct(fields?)
}
AvroSchema::Enum(EnumSchema { .. }) => DataType::Utf8,
AvroSchema::Fixed(FixedSchema { size, .. }) => DataType::FixedSizeBinary(*size as i32),
AvroSchema::Decimal(DecimalSchema {
precision, scale, ..
}) => DataType::Decimal128(*precision as u8, *scale as i8),
AvroSchema::BigDecimal => DataType::LargeBinary,
AvroSchema::Uuid => DataType::FixedSizeBinary(16),
AvroSchema::Date => DataType::Date32,
AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
AvroSchema::TimestampNanos => DataType::Timestamp(TimeUnit::Nanosecond, None),
AvroSchema::LocalTimestampMillis => todo!(),
AvroSchema::LocalTimestampMicros => todo!(),
AvroSchema::LocalTimestampNanos => todo!(),
AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
};
let data_type = field_type.clone();
let name = name.unwrap_or_else(|| default_field_name(&data_type));
let mut field = Field::new(name, field_type, nullable);
field.set_metadata(props.unwrap_or_default());
Ok(field)
}