private def newConverter()

in spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala [206:380]


  private def newConverter(
      parquetType: Type,
      catalystType: DataType,
      updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {

    catalystType match {
      case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
        new ParquetPrimitiveConverter(updater)

      case GeometryUDT =>
        if (parquetType.isPrimitive) {
          new ParquetPrimitiveConverter(updater) {
            override def addBinary(value: Binary): Unit = {
              val wkbReader = new WKBReader()
              val geom = wkbReader.read(value.getBytes)
              updater.set(GeometryUDT.serialize(geom))
            }
          }
        } else {
          if (GeoParquetUtils.isLegacyMode(parameters)) {
            new ParquetArrayConverter(
              parquetType.asGroupType(),
              ArrayType(ByteType, containsNull = false),
              updater) {
              override def end(): Unit = {
                val wkbReader = new WKBReader()
                val byteArray = currentArray.map(_.asInstanceOf[Byte]).toArray
                val geom = wkbReader.read(byteArray)
                updater.set(GeometryUDT.serialize(geom))
              }
            }
          } else {
            throw new IllegalArgumentException(
              s"Parquet type for geometry column is $parquetType. This parquet file could be written by " +
                "Apache Sedona <= 1.3.1-incubating. Please use option(\"legacyMode\", \"true\") to read this file.")
          }
        }

      case ByteType =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =
            updater.setByte(value.asInstanceOf[ByteType#InternalType])

          override def addBinary(value: Binary): Unit = {
            val bytes = value.getBytes
            for (b <- bytes) {
              updater.set(b)
            }
          }
        }

      case ShortType =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =
            updater.setShort(value.asInstanceOf[ShortType#InternalType])
        }

      // For INT32 backed decimals
      case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
        new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater)

      // For INT64 backed decimals
      case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
        new ParquetLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater)

      // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
      case t: DecimalType
          if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY ||
            parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
        new ParquetBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater)

      case t: DecimalType =>
        throw new RuntimeException(
          s"Unable to create Parquet converter for decimal type ${t.json} whose Parquet type is " +
            s"$parquetType.  Parquet DECIMAL type can only be backed by INT32, INT64, " +
            "FIXED_LEN_BYTE_ARRAY, or BINARY.")

      case StringType =>
        new ParquetStringConverter(updater)

      case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {
            updater.setLong(timestampRebaseFunc(value))
          }
        }

      case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {
            val micros = GeoDateTimeUtils.millisToMicros(value)
            updater.setLong(timestampRebaseFunc(micros))
          }
        }

      // INT96 timestamp doesn't have a logical type, here we check the physical type instead.
      case TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
        new ParquetPrimitiveConverter(updater) {
          // Converts nanosecond timestamps stored as INT96
          override def addBinary(value: Binary): Unit = {
            val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value)
            val gregorianMicros = int96RebaseFunc(julianMicros)
            val adjTime = convertTz
              .map(DateTimeUtils.convertTz(gregorianMicros, _, ZoneOffset.UTC))
              .getOrElse(gregorianMicros)
            updater.setLong(adjTime)
          }
        }

      case DateType =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit = {
            updater.set(dateRebaseFunc(value))
          }
        }

      // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
      // annotated by `LIST` or `MAP` should be interpreted as a required list of required
      // elements where the element type is the type of the field.
      case t: ArrayType if parquetType.getOriginalType != LIST =>
        if (parquetType.isPrimitive) {
          new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
        } else {
          new RepeatedGroupConverter(parquetType, t.elementType, updater)
        }

      case t: ArrayType =>
        new ParquetArrayConverter(parquetType.asGroupType(), t, updater)

      case t: MapType =>
        new ParquetMapConverter(parquetType.asGroupType(), t, updater)

      case t: StructType =>
        val wrappedUpdater = {
          // SPARK-30338: avoid unnecessary InternalRow copying for nested structs:
          // There are two cases to handle here:
          //
          //  1. Parent container is a map or array: we must make a deep copy of the mutable row
          //     because this converter may be invoked multiple times per Parquet input record
          //     (if the map or array contains multiple elements).
          //
          //  2. Parent container is a struct: we don't need to copy the row here because either:
          //
          //     (a) all ancestors are structs and therefore no copying is required because this
          //         converter will only be invoked once per Parquet input record, or
          //     (b) some ancestor is struct that is nested in a map or array and that ancestor's
          //         converter will perform deep-copying (which will recursively copy this row).
          if (updater.isInstanceOf[RowUpdater]) {
            // `updater` is a RowUpdater, implying that the parent container is a struct.
            updater
          } else {
            // `updater` is NOT a RowUpdater, implying that the parent container a map or array.
            new ParentContainerUpdater {
              override def set(value: Any): Unit = {
                updater.set(value.asInstanceOf[SpecificInternalRow].copy()) // deep copy
              }
            }
          }
        }
        new GeoParquetRowConverter(
          schemaConverter,
          parquetType.asGroupType(),
          t,
          convertTz,
          datetimeRebaseMode,
          int96RebaseMode,
          parameters,
          wrappedUpdater)

      case t =>
        throw new RuntimeException(
          s"Unable to create Parquet converter for data type ${t.json} " +
            s"whose Parquet type is $parquetType")
    }
  }