in spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala [208:401]
private def newConverter(
parquetType: Type,
catalystType: DataType,
updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {
catalystType match {
case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
new ParquetPrimitiveConverter(updater)
case GeometryUDT =>
if (parquetType.isPrimitive) {
new ParquetPrimitiveConverter(updater) {
override def addBinary(value: Binary): Unit = {
val wkbReader = new WKBReader()
val geom = wkbReader.read(value.getBytes)
updater.set(GeometryUDT.serialize(geom))
}
}
} else {
if (GeoParquetUtils.isLegacyMode(parameters)) {
new ParquetArrayConverter(
parquetType.asGroupType(),
ArrayType(ByteType, containsNull = false),
updater) {
override def end(): Unit = {
val wkbReader = new WKBReader()
val byteArray = currentArray.map(_.asInstanceOf[Byte]).toArray
val geom = wkbReader.read(byteArray)
updater.set(GeometryUDT.serialize(geom))
}
}
} else {
throw new IllegalArgumentException(
s"Parquet type for geometry column is $parquetType. This parquet file could be written by " +
"Apache Sedona <= 1.3.1-incubating. Please use option(\"legacyMode\", \"true\") to read this file.")
}
}
case ByteType =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
updater.setByte(value.asInstanceOf[ByteType#InternalType])
override def addBinary(value: Binary): Unit = {
val bytes = value.getBytes
for (b <- bytes) {
updater.set(b)
}
}
}
case ShortType =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
updater.setShort(value.asInstanceOf[ShortType#InternalType])
}
// For INT32 backed decimals
case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
// For INT64 backed decimals
case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
new ParquetLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
// For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
case t: DecimalType
if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY ||
parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
new ParquetBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
case t: DecimalType =>
throw new RuntimeException(
s"Unable to create Parquet converter for decimal type ${t.json} whose Parquet type is " +
s"$parquetType. Parquet DECIMAL type can only be backed by INT32, INT64, " +
"FIXED_LEN_BYTE_ARRAY, or BINARY.")
case StringType =>
new ParquetStringConverter(updater)
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(timestampRebaseFunc(value))
}
}
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
val micros = DateTimeUtils.millisToMicros(value)
updater.setLong(timestampRebaseFunc(micros))
}
}
// INT96 timestamp doesn't have a logical type, here we check the physical type instead.
case TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value)
val gregorianMicros = int96RebaseFunc(julianMicros)
val adjTime = convertTz
.map(DateTimeUtils.convertTz(gregorianMicros, _, ZoneOffset.UTC))
.getOrElse(gregorianMicros)
updater.setLong(adjTime)
}
}
case TimestampNTZType
if canReadAsTimestampNTZ(parquetType) &&
parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation]
.getUnit == TimeUnit.MICROS =>
new ParquetPrimitiveConverter(updater)
case TimestampNTZType
if canReadAsTimestampNTZ(parquetType) &&
parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation]
.getUnit == TimeUnit.MILLIS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
val micros = DateTimeUtils.millisToMicros(value)
updater.setLong(micros)
}
}
case DateType =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
updater.set(dateRebaseFunc(value))
}
}
// A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
// annotated by `LIST` or `MAP` should be interpreted as a required list of required
// elements where the element type is the type of the field.
case t: ArrayType if parquetType.getOriginalType != LIST =>
if (parquetType.isPrimitive) {
new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
} else {
new RepeatedGroupConverter(parquetType, t.elementType, updater)
}
case t: ArrayType =>
new ParquetArrayConverter(parquetType.asGroupType(), t, updater)
case t: MapType =>
new ParquetMapConverter(parquetType.asGroupType(), t, updater)
case t: StructType =>
val wrappedUpdater = {
// SPARK-30338: avoid unnecessary InternalRow copying for nested structs:
// There are two cases to handle here:
//
// 1. Parent container is a map or array: we must make a deep copy of the mutable row
// because this converter may be invoked multiple times per Parquet input record
// (if the map or array contains multiple elements).
//
// 2. Parent container is a struct: we don't need to copy the row here because either:
//
// (a) all ancestors are structs and therefore no copying is required because this
// converter will only be invoked once per Parquet input record, or
// (b) some ancestor is struct that is nested in a map or array and that ancestor's
// converter will perform deep-copying (which will recursively copy this row).
if (updater.isInstanceOf[RowUpdater]) {
// `updater` is a RowUpdater, implying that the parent container is a struct.
updater
} else {
// `updater` is NOT a RowUpdater, implying that the parent container a map or array.
new ParentContainerUpdater {
override def set(value: Any): Unit = {
updater.set(value.asInstanceOf[SpecificInternalRow].copy()) // deep copy
}
}
}
}
new GeoParquetRowConverter(
schemaConverter,
parquetType.asGroupType(),
t,
convertTz,
datetimeRebaseMode,
int96RebaseMode,
parameters,
wrappedUpdater)
case t =>
throw new RuntimeException(
s"Unable to create Parquet converter for data type ${t.json} " +
s"whose Parquet type is $parquetType")
}
}