in sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala [299:561]
private def newConverter(
parquetType: Type,
catalystType: DataType,
updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {
def isUnsignedIntTypeMatched(bitWidth: Int): Boolean = {
parquetType.getLogicalTypeAnnotation match {
case i: IntLogicalTypeAnnotation if !i.isSigned => i.getBitWidth == bitWidth
case _ => false
}
}
catalystType match {
case LongType if isUnsignedIntTypeMatched(32) =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
this.updater.setLong(Integer.toUnsignedLong(value))
}
case LongType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
this.updater.setLong(value)
}
case DoubleType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
this.updater.setDouble(value)
}
case DoubleType if parquetType.asPrimitiveType().getPrimitiveTypeName == FLOAT =>
new ParquetPrimitiveConverter(updater) {
override def addFloat(value: Float): Unit =
this.updater.setDouble(value)
}
case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType |
_: AnsiIntervalType =>
new ParquetPrimitiveConverter(updater)
case ByteType =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
this.updater.setByte(value.asInstanceOf[PhysicalByteType#InternalType])
}
case ShortType =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
this.updater.setShort(value.asInstanceOf[PhysicalShortType#InternalType])
}
// For INT32 backed decimals
case _: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
case decimalType: DecimalLogicalTypeAnnotation =>
new ParquetIntDictionaryAwareDecimalConverter(
decimalType.getPrecision, decimalType.getScale, updater)
case _ =>
// If the column is a plain INT32, we should pick the precision that can host the
// largest INT32 value.
new ParquetIntDictionaryAwareDecimalConverter(
DecimalType.IntDecimal.precision, 0, updater)
}
// For unsigned int64
case _: DecimalType if isUnsignedIntTypeMatched(64) =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
this.updater.set(Decimal(java.lang.Long.toUnsignedString(value)))
}
}
// For INT64 backed decimals
case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
case decimalType: DecimalLogicalTypeAnnotation =>
new ParquetLongDictionaryAwareDecimalConverter(
decimalType.getPrecision, decimalType.getScale, updater)
case _ =>
// If the column is a plain INT64, we should pick the precision that can host the
// largest INT64 value.
new ParquetLongDictionaryAwareDecimalConverter(
DecimalType.LongDecimal.precision, 0, updater)
}
// For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
case t: DecimalType
if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY ||
parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
case decimalType: DecimalLogicalTypeAnnotation =>
new ParquetBinaryDictionaryAwareDecimalConverter(
decimalType.getPrecision, decimalType.getScale, updater)
case _ =>
throw QueryExecutionErrors.cannotCreateParquetConverterForTypeError(
t, parquetType.toString)
}
case t: DecimalType =>
throw QueryExecutionErrors.cannotCreateParquetConverterForDecimalTypeError(
t, parquetType.toString)
case _: StringType =>
new ParquetStringConverter(updater)
// As long as the parquet type is INT64 timestamp, whether logical annotation
// `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
case TimestampType
if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MICROS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
this.updater.setLong(timestampRebaseFunc(value))
}
}
// As long as the parquet type is INT64 timestamp, whether logical annotation
// `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
case TimestampType
if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MILLIS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
val micros = DateTimeUtils.millisToMicros(value)
this.updater.setLong(timestampRebaseFunc(micros))
}
}
// INT96 timestamp doesn't have a logical type, here we check the physical type instead.
case TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value)
val gregorianMicros = int96RebaseFunc(julianMicros)
val adjTime = convertTz.map(DateTimeUtils.convertTz(gregorianMicros, _, ZoneOffset.UTC))
.getOrElse(gregorianMicros)
this.updater.setLong(adjTime)
}
}
// INT96 timestamp doesn't have a logical type, here we check the physical type instead.
case TimestampNTZType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96.
// TimestampNTZ type does not require rebasing due to its lack of time zone context.
override def addBinary(value: Binary): Unit = {
val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value)
this.updater.setLong(julianMicros)
}
}
case TimestampNTZType
if canReadAsTimestampNTZ(parquetType) &&
parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MICROS =>
new ParquetPrimitiveConverter(updater)
case TimestampNTZType
if canReadAsTimestampNTZ(parquetType) &&
parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MILLIS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
val micros = DateTimeUtils.millisToMicros(value)
this.updater.setLong(micros)
}
}
// Allow upcasting INT32 date to timestampNTZ.
case TimestampNTZType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 &&
parquetType.getLogicalTypeAnnotation.isInstanceOf[DateLogicalTypeAnnotation] =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
this.updater.set(DateTimeUtils.daysToMicros(dateRebaseFunc(value), ZoneOffset.UTC))
}
}
case DateType =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
this.updater.set(dateRebaseFunc(value))
}
}
case _: TimeType
if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimeLogicalTypeAnnotation] &&
parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimeLogicalTypeAnnotation].getUnit == TimeUnit.MICROS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
this.updater.setLong(value)
}
}
// A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
// annotated by `LIST` or `MAP` should be interpreted as a required list of required
// elements where the element type is the type of the field.
case t: ArrayType
if !parquetType.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation] =>
if (parquetType.isPrimitive) {
new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
} else {
new RepeatedGroupConverter(parquetType, t.elementType, updater)
}
case t: ArrayType =>
new ParquetArrayConverter(parquetType.asGroupType(), t, updater)
case t: MapType =>
new ParquetMapConverter(parquetType.asGroupType(), t, updater)
case t: StructType if VariantMetadata.isVariantStruct(t) =>
new ParquetVariantConverter(t, parquetType.asGroupType(), updater)
case t: StructType =>
val wrappedUpdater = {
// SPARK-30338: avoid unnecessary InternalRow copying for nested structs:
// There are two cases to handle here:
//
// 1. Parent container is a map or array: we must make a deep copy of the mutable row
// because this converter may be invoked multiple times per Parquet input record
// (if the map or array contains multiple elements).
//
// 2. Parent container is a struct: we don't need to copy the row here because either:
//
// (a) all ancestors are structs and therefore no copying is required because this
// converter will only be invoked once per Parquet input record, or
// (b) some ancestor is struct that is nested in a map or array and that ancestor's
// converter will perform deep-copying (which will recursively copy this row).
if (updater.isInstanceOf[RowUpdater]) {
// `updater` is a RowUpdater, implying that the parent container is a struct.
updater
} else {
// `updater` is NOT a RowUpdater, implying that the parent container a map or array.
new ParentContainerUpdater {
override def set(value: Any): Unit = {
updater.set(value.asInstanceOf[SpecificInternalRow].copy()) // deep copy
}
}
}
}
new ParquetRowConverter(
schemaConverter,
parquetType.asGroupType(),
t,
convertTz,
datetimeRebaseSpec,
int96RebaseSpec,
wrappedUpdater)
case t: VariantType =>
if (SQLConf.get.getConf(SQLConf.VARIANT_ALLOW_READING_SHREDDED)) {
new ParquetVariantConverter(t, parquetType.asGroupType(), updater)
} else {
new ParquetUnshreddedVariantConverter(parquetType.asGroupType(), updater)
}
case t =>
throw QueryExecutionErrors.cannotCreateParquetConverterForDataTypeError(
t, parquetType.toString)
}
}