in spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala [115:201]
override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString)
this.writeLegacyParquetFormat = {
// `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set in ParquetRelation
assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null)
configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
}
this.outputTimestampType = {
val key = SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key
assert(configuration.get(key) != null)
SQLConf.ParquetOutputTimestampType.withName(configuration.get(key))
}
this.rootFieldWriters = schema.zipWithIndex
.map { case (field, ordinal) =>
makeWriter(field.dataType, Some(ordinal))
}
.toArray[ValueWriter]
if (geometryColumnInfoMap.isEmpty) {
throw new RuntimeException("No geometry column found in the schema")
}
geoParquetVersion = configuration.get(GEOPARQUET_VERSION_KEY) match {
case null => Some(VERSION)
case version: String => Some(version)
}
defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match {
case null =>
// If no CRS is specified, we write null to the crs metadata field. This is for compatibility with
// geopandas 0.10.0 and earlier versions, which requires crs field to be present.
Some(org.json4s.JNull)
case "" => None
case crs: String => Some(parse(crs))
}
geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
Option(configuration.get(GEOPARQUET_CRS_KEY + "." + name)).foreach {
case "" => geoParquetColumnCrsMap.put(name, None)
case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
}
}
Option(configuration.get(GEOPARQUET_COVERING_KEY)).foreach { coveringColumnName =>
if (geometryColumnInfoMap.size > 1) {
throw new IllegalArgumentException(
s"$GEOPARQUET_COVERING_KEY is ambiguous when there are multiple geometry columns." +
s"Please specify $GEOPARQUET_COVERING_KEY.<columnName> for configured geometry column.")
}
val geometryColumnName = schema(geometryColumnInfoMap.keys.head).name
val covering = createCoveringColumnMetadata(coveringColumnName, schema)
geoParquetColumnCoveringMap.put(geometryColumnName, covering)
}
geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
coveringColumnName =>
val covering = createCoveringColumnMetadata(coveringColumnName, schema)
geoParquetColumnCoveringMap.put(name, covering)
}
}
val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
val sparkSqlParquetRowMetadata = GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
val metadata = Map(
SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
ParquetReadSupport.SPARK_METADATA_KEY -> sparkSqlParquetRowMetadata) ++ {
if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
Some("org.apache.spark.legacyDateTime" -> "")
} else {
None
}
} ++ {
if (int96RebaseMode == LegacyBehaviorPolicy.LEGACY) {
Some("org.apache.spark.legacyINT96" -> "")
} else {
None
}
}
logInfo(s"""Initialized Parquet WriteSupport with Catalyst schema:
|${schema.prettyJson}
|and corresponding Parquet message type:
|$messageType
""".stripMargin)
new WriteContext(messageType, metadata.asJava)
}