private def makeWriter()

in spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala [254:355]


  private def makeWriter(dataType: DataType, rootOrdinal: Option[Int] = None): ValueWriter = {
    dataType match {
      case BooleanType =>
        (row: SpecializedGetters, ordinal: Int) =>
          recordConsumer.addBoolean(row.getBoolean(ordinal))

      case ByteType =>
        (row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getByte(ordinal))

      case ShortType =>
        (row: SpecializedGetters, ordinal: Int) =>
          recordConsumer.addInteger(row.getShort(ordinal))

      case DateType =>
        (row: SpecializedGetters, ordinal: Int) =>
          recordConsumer.addInteger(dateRebaseFunc(row.getInt(ordinal)))

      case IntegerType =>
        (row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getInt(ordinal))

      case LongType =>
        (row: SpecializedGetters, ordinal: Int) => recordConsumer.addLong(row.getLong(ordinal))

      case FloatType =>
        (row: SpecializedGetters, ordinal: Int) => recordConsumer.addFloat(row.getFloat(ordinal))

      case DoubleType =>
        (row: SpecializedGetters, ordinal: Int) =>
          recordConsumer.addDouble(row.getDouble(ordinal))

      case StringType =>
        (row: SpecializedGetters, ordinal: Int) =>
          recordConsumer.addBinary(
            Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))

      case TimestampType =>
        outputTimestampType match {
          case SQLConf.ParquetOutputTimestampType.INT96 =>
            (row: SpecializedGetters, ordinal: Int) =>
              val micros = int96RebaseFunc(row.getLong(ordinal))
              val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(micros)
              val buf = ByteBuffer.wrap(timestampBuffer)
              buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
              recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))

          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
            (row: SpecializedGetters, ordinal: Int) =>
              val micros = row.getLong(ordinal)
              recordConsumer.addLong(timestampRebaseFunc(micros))

          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
            (row: SpecializedGetters, ordinal: Int) =>
              val micros = row.getLong(ordinal)
              val millis = DateTimeUtils.microsToMillis(timestampRebaseFunc(micros))
              recordConsumer.addLong(millis)
        }

      case TimestampNTZType =>
        // For TimestampNTZType column, Spark always output as INT64 with Timestamp annotation in
        // MICROS time unit.
        (row: SpecializedGetters, ordinal: Int) => recordConsumer.addLong(row.getLong(ordinal))

      case BinaryType =>
        (row: SpecializedGetters, ordinal: Int) =>
          recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)))

      case DecimalType.Fixed(precision, scale) =>
        makeDecimalWriter(precision, scale)

      case t: StructType =>
        val fieldWriters = t.map(_.dataType).map(makeWriter(_, None)).toArray[ValueWriter]
        (row: SpecializedGetters, ordinal: Int) =>
          consumeGroup {
            writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)
          }

      case t: ArrayType => makeArrayWriter(t)

      case t: MapType => makeMapWriter(t)

      case GeometryUDT =>
        val geometryColumnInfo = rootOrdinal match {
          case Some(ordinal) =>
            geometryColumnInfoMap.getOrElseUpdate(ordinal, new GeometryColumnInfo())
          case None => null
        }
        (row: SpecializedGetters, ordinal: Int) => {
          val serializedGeometry = row.getBinary(ordinal)
          val geom = GeometryUDT.deserialize(serializedGeometry)
          val wkbWriter = new WKBWriter(GeomUtils.getDimension(geom))
          recordConsumer.addBinary(Binary.fromReusedByteArray(wkbWriter.write(geom)))
          if (geometryColumnInfo != null) {
            geometryColumnInfo.update(geom)
          }
        }

      case t: UserDefinedType[_] => makeWriter(t.sqlType)

      // TODO Adds IntervalType support
      case _ => sys.error(s"Unsupported data type $dataType.")
    }
  }