in sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala [196:325]
private def convertPrimitiveField(
primitiveColumn: PrimitiveColumnIO,
sparkReadType: Option[DataType] = None): ParquetColumn = {
val parquetType = primitiveColumn.getType.asPrimitiveType()
val typeAnnotation = primitiveColumn.getType.getLogicalTypeAnnotation
val typeName = primitiveColumn.getPrimitive
def typeString =
if (typeAnnotation == null) s"$typeName" else s"$typeName ($typeAnnotation)"
def typeNotImplemented() =
throw QueryCompilationErrors.parquetTypeUnsupportedYetError(typeString)
def illegalType() =
throw QueryCompilationErrors.illegalParquetTypeError(typeString)
// When maxPrecision = -1, we skip precision range check, and always respect the precision
// specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored
// as binaries with variable lengths.
def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
val decimalLogicalTypeAnnotation = typeAnnotation
.asInstanceOf[DecimalLogicalTypeAnnotation]
val precision = decimalLogicalTypeAnnotation.getPrecision
val scale = decimalLogicalTypeAnnotation.getScale
ParquetSchemaConverter.checkConversionRequirement(
maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")
DecimalType(precision, scale)
}
val sparkType = sparkReadType.getOrElse(typeName match {
case BOOLEAN => BooleanType
case FLOAT => FloatType
case DOUBLE => DoubleType
case INT32 =>
typeAnnotation match {
case intTypeAnnotation: IntLogicalTypeAnnotation if intTypeAnnotation.isSigned =>
intTypeAnnotation.getBitWidth match {
case 8 => ByteType
case 16 => ShortType
case 32 => IntegerType
case _ => illegalType()
}
case null => IntegerType
case _: DateLogicalTypeAnnotation => DateType
case _: DecimalLogicalTypeAnnotation => makeDecimalType(Decimal.MAX_INT_DIGITS)
case intTypeAnnotation: IntLogicalTypeAnnotation if !intTypeAnnotation.isSigned =>
intTypeAnnotation.getBitWidth match {
case 8 => ShortType
case 16 => IntegerType
case 32 => LongType
case _ => illegalType()
}
case t: TimestampLogicalTypeAnnotation if t.getUnit == TimeUnit.MILLIS =>
typeNotImplemented()
case _ => illegalType()
}
case INT64 =>
typeAnnotation match {
case intTypeAnnotation: IntLogicalTypeAnnotation if intTypeAnnotation.isSigned =>
intTypeAnnotation.getBitWidth match {
case 64 => LongType
case _ => illegalType()
}
case null => LongType
case _: DecimalLogicalTypeAnnotation => makeDecimalType(Decimal.MAX_LONG_DIGITS)
case intTypeAnnotation: IntLogicalTypeAnnotation if !intTypeAnnotation.isSigned =>
intTypeAnnotation.getBitWidth match {
// The precision to hold the largest unsigned long is:
// `java.lang.Long.toUnsignedString(-1).length` = 20
case 64 => DecimalType(20, 0)
case _ => illegalType()
}
case timestamp: TimestampLogicalTypeAnnotation
if timestamp.getUnit == TimeUnit.MICROS || timestamp.getUnit == TimeUnit.MILLIS =>
if (timestamp.isAdjustedToUTC || !inferTimestampNTZ) {
TimestampType
} else {
TimestampNTZType
}
// SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without
// timezone awareness to address behaviour regression introduced by SPARK-34661
case timestamp: TimestampLogicalTypeAnnotation
if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong =>
LongType
case time: TimeLogicalTypeAnnotation
if time.getUnit == TimeUnit.MICROS && !time.isAdjustedToUTC =>
TimeType(TimeType.MICROS_PRECISION)
case _ =>
illegalType()
}
case INT96 =>
ParquetSchemaConverter.checkConversionRequirement(
assumeInt96IsTimestamp,
"INT96 is not supported unless it's interpreted as timestamp. " +
s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
TimestampType
case BINARY =>
typeAnnotation match {
case _: StringLogicalTypeAnnotation | _: EnumLogicalTypeAnnotation |
_: JsonLogicalTypeAnnotation => StringType
case null if assumeBinaryIsString => StringType
case null => BinaryType
case _: BsonLogicalTypeAnnotation => BinaryType
case _: DecimalLogicalTypeAnnotation => makeDecimalType()
case _ => illegalType()
}
case FIXED_LEN_BYTE_ARRAY =>
typeAnnotation match {
case _: DecimalLogicalTypeAnnotation =>
makeDecimalType(Decimal.maxPrecisionForBytes(parquetType.getTypeLength))
case _: IntervalLogicalTypeAnnotation => typeNotImplemented()
case null => BinaryType
case _ => illegalType()
}
case _ => illegalType()
})
ParquetColumn(sparkType, primitiveColumn)
}