private def toSqlTypeHelper()

in sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala [99:302]


  private def toSqlTypeHelper(
      avroSchema: Schema,
      existingRecordNames: Map[String, Int],
      useStableIdForUnionType: Boolean,
      stableIdPrefixForUnionType: String,
      recursiveFieldMaxDepth: Int): SchemaType = {
    avroSchema.getType match {
      case INT => avroSchema.getLogicalType match {
        case _: Date => SchemaType(DateType, nullable = false)
        case _ =>
          val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
          val catalystType = if (catalystTypeAttrValue == null) {
            IntegerType
          } else {
            CatalystSqlParser.parseDataType(catalystTypeAttrValue)
          }
          SchemaType(catalystType, nullable = false)
      }
      case STRING => SchemaType(StringType, nullable = false)
      case BOOLEAN => SchemaType(BooleanType, nullable = false)
      case BYTES | FIXED => avroSchema.getLogicalType match {
        // For FIXED type, if the precision requires more bytes than fixed size, the logical
        // type will be null, which is handled by Avro library.
        case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false)
        case _ => SchemaType(BinaryType, nullable = false)
      }

      case DOUBLE => SchemaType(DoubleType, nullable = false)
      case FLOAT => SchemaType(FloatType, nullable = false)
      case LONG => avroSchema.getLogicalType match {
        case d: CustomDecimal =>
          SchemaType(DecimalType(d.precision, d.scale), nullable = false)
        case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false)
        case _: LocalTimestampMillis | _: LocalTimestampMicros =>
          SchemaType(TimestampNTZType, nullable = false)
        case _ =>
          val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
          val catalystType = if (catalystTypeAttrValue == null) {
            LongType
          } else {
            CatalystSqlParser.parseDataType(catalystTypeAttrValue)
          }
          SchemaType(catalystType, nullable = false)
      }

      case ENUM => SchemaType(StringType, nullable = false)

      case NULL => SchemaType(NullType, nullable = true)

      case RECORD =>
        val recursiveDepth: Int = existingRecordNames.getOrElse(avroSchema.getFullName, 0)
        if (recursiveDepth > 0 && recursiveFieldMaxDepth <= 0) {
          val formattedAvroSchema = SchemaFormatter.format(AvroUtils.JSON_PRETTY_FORMAT, avroSchema)
          throw new IncompatibleSchemaException(s"""
            |Found recursive reference in Avro schema, which can not be processed by Spark by
            | default: $formattedAvroSchema. Try setting the option `recursiveFieldMaxDepth`
            | to 1 - $RECURSIVE_FIELD_MAX_DEPTH_LIMIT.
          """.stripMargin)
        } else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) {
          logInfo(
            log"The field ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " +
              log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} is dropped at recursive depth " +
              log"${MDC(RECURSIVE_DEPTH, recursiveDepth)}."
          )
          null
        } else {
          val newRecordNames =
            existingRecordNames + (avroSchema.getFullName -> (recursiveDepth + 1))
          val fields = avroSchema.getFields.asScala.map { f =>
            val schemaType = toSqlTypeHelper(
              f.schema(),
              newRecordNames,
              useStableIdForUnionType,
              stableIdPrefixForUnionType,
              recursiveFieldMaxDepth)
            if (schemaType == null) {
              null
            }
            else {
              StructField(f.name, schemaType.dataType, schemaType.nullable)
            }
          }.filter(_ != null).toSeq

          SchemaType(StructType(fields), nullable = false)
        }

      case ARRAY =>
        val schemaType = toSqlTypeHelper(
          avroSchema.getElementType,
          existingRecordNames,
          useStableIdForUnionType,
          stableIdPrefixForUnionType,
          recursiveFieldMaxDepth)
        if (schemaType == null) {
          logInfo(
            log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " +
              log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} as it does not have any " +
              log"fields left likely due to recursive depth limit."
          )
          null
        } else {
          SchemaType(
            ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
            nullable = false)
        }

      case MAP =>
        val schemaType = toSqlTypeHelper(avroSchema.getValueType,
          existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType,
          recursiveFieldMaxDepth)
        if (schemaType == null) {
          logInfo(
            log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " +
              log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} as it does not have any " +
              log"fields left likely due to recursive depth limit."
          )
          null
        } else {
          SchemaType(
            MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
            nullable = false)
        }

      case UNION =>
        if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
          // In case of a union with null, eliminate it and make a recursive call
          val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
          val remainingSchema =
            if (remainingUnionTypes.size == 1) {
              remainingUnionTypes.head
            } else {
              Schema.createUnion(remainingUnionTypes.asJava)
            }
          val schemaType = toSqlTypeHelper(
            remainingSchema,
            existingRecordNames,
            useStableIdForUnionType,
            stableIdPrefixForUnionType,
            recursiveFieldMaxDepth)

          if (schemaType == null) {
            logInfo(
              log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " +
                log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} as it does not have any " +
                log"fields left likely due to recursive depth limit."
            )
            null
          } else {
            schemaType.copy(nullable = true)
          }
        } else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
          case Seq(t1) =>
            toSqlTypeHelper(avroSchema.getTypes.get(0),
              existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType,
              recursiveFieldMaxDepth)
          case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
            SchemaType(LongType, nullable = false)
          case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
            SchemaType(DoubleType, nullable = false)
          case _ =>
            // When avroOptions.useStableIdForUnionType is false, convert complex unions to struct
            // types where field names are member0, member1, etc. This is consistent with the
            // behavior when converting between Avro and Parquet.
            // If avroOptions.useStableIdForUnionType is true, include type name in field names
            // so that users can drop or add fields and keep field name stable.
            val fieldNameSet : mutable.Set[String] = mutable.Set()
            val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
              case (s, i) =>
                val schemaType = toSqlTypeHelper(
                  s,
                  existingRecordNames,
                  useStableIdForUnionType,
                  stableIdPrefixForUnionType,
                  recursiveFieldMaxDepth)
                if (schemaType == null) {
                  null
                } else {
                  val fieldName = if (useStableIdForUnionType) {
                    // Avro's field name may be case sensitive, so field names for two named type
                    // could be "a" and "A" and we need to distinguish them. In this case, we throw
                    // an exception.
                    // Stable id prefix can be empty so the name of the field can be just the type.
                    val tempFieldName = s"${stableIdPrefixForUnionType}${s.getName}"
                    if (!fieldNameSet.add(tempFieldName.toLowerCase(Locale.ROOT))) {
                      throw new IncompatibleSchemaException(
                        "Cannot generate stable identifier for Avro union type due to name " +
                          s"conflict of type name ${s.getName}")
                    }
                    tempFieldName
                  } else {
                    s"member$i"
                  }

                  // All fields are nullable because only one of them is set at a time
                  StructField(fieldName, schemaType.dataType, nullable = true)
                }
            }.filter(_ != null).toSeq

            SchemaType(StructType(fields), nullable = false)
        }

      case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
    }
  }