override def init()

in spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala [116:202]


  override def init(configuration: Configuration): WriteContext = {
    val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
    this.schema = StructType.fromString(schemaString)
    this.writeLegacyParquetFormat = {
      // `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set in ParquetRelation
      assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null)
      configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
    }

    this.outputTimestampType = {
      val key = SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key
      assert(configuration.get(key) != null)
      SQLConf.ParquetOutputTimestampType.withName(configuration.get(key))
    }

    this.rootFieldWriters = schema.zipWithIndex
      .map { case (field, ordinal) =>
        makeWriter(field.dataType, Some(ordinal))
      }
      .toArray[ValueWriter]

    if (geometryColumnInfoMap.isEmpty) {
      throw new RuntimeException("No geometry column found in the schema")
    }

    geoParquetVersion = configuration.get(GEOPARQUET_VERSION_KEY) match {
      case null => Some(VERSION)
      case version: String => Some(version)
    }
    defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match {
      case null =>
        // If no CRS is specified, we write null to the crs metadata field. This is for compatibility with
        // geopandas 0.10.0 and earlier versions, which requires crs field to be present.
        Some(org.json4s.JNull)
      case "" => None
      case crs: String => Some(parse(crs))
    }
    geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
      Option(configuration.get(GEOPARQUET_CRS_KEY + "." + name)).foreach {
        case "" => geoParquetColumnCrsMap.put(name, None)
        case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
      }
    }
    Option(configuration.get(GEOPARQUET_COVERING_KEY)).foreach { coveringColumnName =>
      if (geometryColumnInfoMap.size > 1) {
        throw new IllegalArgumentException(
          s"$GEOPARQUET_COVERING_KEY is ambiguous when there are multiple geometry columns." +
            s"Please specify $GEOPARQUET_COVERING_KEY.<columnName> for configured geometry column.")
      }
      val geometryColumnName = schema(geometryColumnInfoMap.keys.head).name
      val covering = createCoveringColumnMetadata(coveringColumnName, schema)
      geoParquetColumnCoveringMap.put(geometryColumnName, covering)
    }
    geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
      Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
        coveringColumnName =>
          val covering = createCoveringColumnMetadata(coveringColumnName, schema)
          geoParquetColumnCoveringMap.put(name, covering)
      }
    }

    val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
    val sparkSqlParquetRowMetadata = GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
    val metadata = Map(
      SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
      ParquetReadSupport.SPARK_METADATA_KEY -> sparkSqlParquetRowMetadata) ++ {
      if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
        Some("org.apache.spark.legacyDateTime" -> "")
      } else {
        None
      }
    } ++ {
      if (int96RebaseMode == LegacyBehaviorPolicy.LEGACY) {
        Some("org.apache.spark.legacyINT96" -> "")
      } else {
        None
      }
    }

    logInfo(s"""Initialized Parquet WriteSupport with Catalyst schema:
         |${schema.prettyJson}
         |and corresponding Parquet message type:
         |$messageType
       """.stripMargin)

    new WriteContext(messageType, metadata.asJava)
  }