in spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/stac/StacPartitionReader.scala [55:137]
override def next(): Boolean = {
if (featureIterator.hasNext) {
true
} else if (itemsIterator.hasNext) {
currentItem = itemsIterator.next()
if (currentItem.startsWith("http://") || currentItem.startsWith("https://") || currentItem
.startsWith("file://")) {
val url = new java.net.URL(currentItem)
// Download the file to a local temp file
val tempFile = File.createTempFile("stac_item_", ".json")
val writer = new PrintWriter(tempFile)
try {
val fileContent = fetchContentWithRetry(url)
val rootNode = mapper.readTree(fileContent)
val nodeType = rootNode.get("type").asText()
nodeType match {
case "Feature" =>
// Write the content as a single line JSON
val content = mapper.writeValueAsString(rootNode)
writer.write(content)
case "FeatureCollection" =>
// Write each feature in the features array to a multi-line JSON file
val features = rootNode.get("features")
val featureIterator = features.elements()
while (featureIterator.hasNext) {
val feature = featureIterator.next()
val content = mapper.writeValueAsString(feature)
writer.write(content)
writer.write("\n")
}
case _ =>
throw new IllegalArgumentException(s"Unsupported type for item: $nodeType")
}
} finally {
writer.close()
}
checkAndDeleteTempFile(currentFile)
currentFile = tempFile
} else {
throw new IllegalArgumentException(s"Unsupported protocol for item: $currentItem")
}
// Parse the current file and extract features
featureIterator = if (currentFile.exists()) {
val parsedOptions = new JSONOptionsInRead(
opts,
opts.getOrElse("sessionLocalTimeZone", "UTC"),
opts.getOrElse("columnNameOfCorruptRecord", "_corrupt_record"))
val dataSource = JsonDataSource(parsedOptions)
val alteredSchema = GeoJSONUtils.updateGeometrySchema(schema, StringType)
val parser = SparkCompatUtil.constructJacksonParser(
alteredSchema,
parsedOptions,
allowArrayAsStructs = true)
val rows = SparkCompatUtil
.readFile(
dataSource,
new Configuration(),
createPartitionedFile(currentFile),
parser,
schema)
rows.map(row => {
val geometryConvertedRow = GeoJSONUtils.convertGeoJsonToGeometry(row, alteredSchema)
val propertiesPromotedRow = promotePropertiesToTop(geometryConvertedRow, alteredSchema)
propertiesPromotedRow
})
} else {
Iterator.empty
}
next()
} else {
false
}
}