in spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala [255:356]
private def makeWriter(dataType: DataType, rootOrdinal: Option[Int] = None): ValueWriter = {
dataType match {
case BooleanType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addBoolean(row.getBoolean(ordinal))
case ByteType =>
(row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getByte(ordinal))
case ShortType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addInteger(row.getShort(ordinal))
case DateType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addInteger(dateRebaseFunc(row.getInt(ordinal)))
case IntegerType =>
(row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getInt(ordinal))
case LongType =>
(row: SpecializedGetters, ordinal: Int) => recordConsumer.addLong(row.getLong(ordinal))
case FloatType =>
(row: SpecializedGetters, ordinal: Int) => recordConsumer.addFloat(row.getFloat(ordinal))
case DoubleType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addDouble(row.getDouble(ordinal))
case StringType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))
case TimestampType =>
outputTimestampType match {
case SQLConf.ParquetOutputTimestampType.INT96 =>
(row: SpecializedGetters, ordinal: Int) =>
val micros = int96RebaseFunc(row.getLong(ordinal))
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(micros)
val buf = ByteBuffer.wrap(timestampBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
(row: SpecializedGetters, ordinal: Int) =>
val micros = row.getLong(ordinal)
recordConsumer.addLong(timestampRebaseFunc(micros))
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
(row: SpecializedGetters, ordinal: Int) =>
val micros = row.getLong(ordinal)
val millis = DateTimeUtils.microsToMillis(timestampRebaseFunc(micros))
recordConsumer.addLong(millis)
}
case TimestampNTZType =>
// For TimestampNTZType column, Spark always output as INT64 with Timestamp annotation in
// MICROS time unit.
(row: SpecializedGetters, ordinal: Int) => recordConsumer.addLong(row.getLong(ordinal))
case BinaryType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)))
case DecimalType.Fixed(precision, scale) =>
makeDecimalWriter(precision, scale)
case t: StructType =>
val fieldWriters = t.map(_.dataType).map(makeWriter(_, None)).toArray[ValueWriter]
(row: SpecializedGetters, ordinal: Int) =>
consumeGroup {
writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)
}
case t: ArrayType => makeArrayWriter(t)
case t: MapType => makeMapWriter(t)
case GeometryUDT =>
val geometryColumnInfo = rootOrdinal match {
case Some(ordinal) =>
geometryColumnInfoMap.getOrElseUpdate(ordinal, new GeometryColumnInfo())
case None => null
}
(row: SpecializedGetters, ordinal: Int) => {
val serializedGeometry = row.getBinary(ordinal)
val geom = GeometryUDT.deserialize(serializedGeometry)
val wkbWriter = new WKBWriter(GeomUtils.getDimension(geom))
recordConsumer.addBinary(Binary.fromReusedByteArray(wkbWriter.write(geom)))
if (geometryColumnInfo != null) {
geometryColumnInfo.update(geom)
}
}
case t: UserDefinedType[_] => makeWriter(t.sqlType)
// TODO Adds IntervalType support
case _ => sys.error(s"Unsupported data type $dataType.")
}
}