in sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala [99:302]
private def toSqlTypeHelper(
avroSchema: Schema,
existingRecordNames: Map[String, Int],
useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String,
recursiveFieldMaxDepth: Int): SchemaType = {
avroSchema.getType match {
case INT => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false)
case _ =>
val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
val catalystType = if (catalystTypeAttrValue == null) {
IntegerType
} else {
CatalystSqlParser.parseDataType(catalystTypeAttrValue)
}
SchemaType(catalystType, nullable = false)
}
case STRING => SchemaType(StringType, nullable = false)
case BOOLEAN => SchemaType(BooleanType, nullable = false)
case BYTES | FIXED => avroSchema.getLogicalType match {
// For FIXED type, if the precision requires more bytes than fixed size, the logical
// type will be null, which is handled by Avro library.
case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false)
case _ => SchemaType(BinaryType, nullable = false)
}
case DOUBLE => SchemaType(DoubleType, nullable = false)
case FLOAT => SchemaType(FloatType, nullable = false)
case LONG => avroSchema.getLogicalType match {
case d: CustomDecimal =>
SchemaType(DecimalType(d.precision, d.scale), nullable = false)
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false)
case _: LocalTimestampMillis | _: LocalTimestampMicros =>
SchemaType(TimestampNTZType, nullable = false)
case _ =>
val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
val catalystType = if (catalystTypeAttrValue == null) {
LongType
} else {
CatalystSqlParser.parseDataType(catalystTypeAttrValue)
}
SchemaType(catalystType, nullable = false)
}
case ENUM => SchemaType(StringType, nullable = false)
case NULL => SchemaType(NullType, nullable = true)
case RECORD =>
val recursiveDepth: Int = existingRecordNames.getOrElse(avroSchema.getFullName, 0)
if (recursiveDepth > 0 && recursiveFieldMaxDepth <= 0) {
val formattedAvroSchema = SchemaFormatter.format(AvroUtils.JSON_PRETTY_FORMAT, avroSchema)
throw new IncompatibleSchemaException(s"""
|Found recursive reference in Avro schema, which can not be processed by Spark by
| default: $formattedAvroSchema. Try setting the option `recursiveFieldMaxDepth`
| to 1 - $RECURSIVE_FIELD_MAX_DEPTH_LIMIT.
""".stripMargin)
} else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) {
logInfo(
log"The field ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " +
log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} is dropped at recursive depth " +
log"${MDC(RECURSIVE_DEPTH, recursiveDepth)}."
)
null
} else {
val newRecordNames =
existingRecordNames + (avroSchema.getFullName -> (recursiveDepth + 1))
val fields = avroSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(
f.schema(),
newRecordNames,
useStableIdForUnionType,
stableIdPrefixForUnionType,
recursiveFieldMaxDepth)
if (schemaType == null) {
null
}
else {
StructField(f.name, schemaType.dataType, schemaType.nullable)
}
}.filter(_ != null).toSeq
SchemaType(StructType(fields), nullable = false)
}
case ARRAY =>
val schemaType = toSqlTypeHelper(
avroSchema.getElementType,
existingRecordNames,
useStableIdForUnionType,
stableIdPrefixForUnionType,
recursiveFieldMaxDepth)
if (schemaType == null) {
logInfo(
log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " +
log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} as it does not have any " +
log"fields left likely due to recursive depth limit."
)
null
} else {
SchemaType(
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
nullable = false)
}
case MAP =>
val schemaType = toSqlTypeHelper(avroSchema.getValueType,
existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType,
recursiveFieldMaxDepth)
if (schemaType == null) {
logInfo(
log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " +
log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} as it does not have any " +
log"fields left likely due to recursive depth limit."
)
null
} else {
SchemaType(
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
nullable = false)
}
case UNION =>
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive call
val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
val remainingSchema =
if (remainingUnionTypes.size == 1) {
remainingUnionTypes.head
} else {
Schema.createUnion(remainingUnionTypes.asJava)
}
val schemaType = toSqlTypeHelper(
remainingSchema,
existingRecordNames,
useStableIdForUnionType,
stableIdPrefixForUnionType,
recursiveFieldMaxDepth)
if (schemaType == null) {
logInfo(
log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " +
log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} as it does not have any " +
log"fields left likely due to recursive depth limit."
)
null
} else {
schemaType.copy(nullable = true)
}
} else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
case Seq(t1) =>
toSqlTypeHelper(avroSchema.getTypes.get(0),
existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType,
recursiveFieldMaxDepth)
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
SchemaType(LongType, nullable = false)
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
SchemaType(DoubleType, nullable = false)
case _ =>
// When avroOptions.useStableIdForUnionType is false, convert complex unions to struct
// types where field names are member0, member1, etc. This is consistent with the
// behavior when converting between Avro and Parquet.
// If avroOptions.useStableIdForUnionType is true, include type name in field names
// so that users can drop or add fields and keep field name stable.
val fieldNameSet : mutable.Set[String] = mutable.Set()
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
case (s, i) =>
val schemaType = toSqlTypeHelper(
s,
existingRecordNames,
useStableIdForUnionType,
stableIdPrefixForUnionType,
recursiveFieldMaxDepth)
if (schemaType == null) {
null
} else {
val fieldName = if (useStableIdForUnionType) {
// Avro's field name may be case sensitive, so field names for two named type
// could be "a" and "A" and we need to distinguish them. In this case, we throw
// an exception.
// Stable id prefix can be empty so the name of the field can be just the type.
val tempFieldName = s"${stableIdPrefixForUnionType}${s.getName}"
if (!fieldNameSet.add(tempFieldName.toLowerCase(Locale.ROOT))) {
throw new IncompatibleSchemaException(
"Cannot generate stable identifier for Avro union type due to name " +
s"conflict of type name ${s.getName}")
}
tempFieldName
} else {
s"member$i"
}
// All fields are nullable because only one of them is set at a time
StructField(fieldName, schemaType.dataType, nullable = true)
}
}.filter(_ != null).toSeq
SchemaType(StructType(fields), nullable = false)
}
case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
}
}