override def next()

in spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/stac/StacPartitionReader.scala [55:137]


  override def next(): Boolean = {
    if (featureIterator.hasNext) {
      true
    } else if (itemsIterator.hasNext) {
      currentItem = itemsIterator.next()
      if (currentItem.startsWith("http://") || currentItem.startsWith("https://") || currentItem
          .startsWith("file://")) {
        val url = new java.net.URL(currentItem)

        // Download the file to a local temp file
        val tempFile = File.createTempFile("stac_item_", ".json")
        val writer = new PrintWriter(tempFile)
        try {
          val fileContent = fetchContentWithRetry(url)
          val rootNode = mapper.readTree(fileContent)
          val nodeType = rootNode.get("type").asText()

          nodeType match {
            case "Feature" =>
              // Write the content as a single line JSON
              val content = mapper.writeValueAsString(rootNode)
              writer.write(content)
            case "FeatureCollection" =>
              // Write each feature in the features array to a multi-line JSON file
              val features = rootNode.get("features")
              val featureIterator = features.elements()
              while (featureIterator.hasNext) {
                val feature = featureIterator.next()
                val content = mapper.writeValueAsString(feature)
                writer.write(content)
                writer.write("\n")
              }
            case _ =>
              throw new IllegalArgumentException(s"Unsupported type for item: $nodeType")
          }

        } finally {
          writer.close()
        }
        checkAndDeleteTempFile(currentFile)
        currentFile = tempFile
      } else {
        throw new IllegalArgumentException(s"Unsupported protocol for item: $currentItem")
      }

      // Parse the current file and extract features
      featureIterator = if (currentFile.exists()) {

        val parsedOptions = new JSONOptionsInRead(
          opts,
          opts.getOrElse("sessionLocalTimeZone", "UTC"),
          opts.getOrElse("columnNameOfCorruptRecord", "_corrupt_record"))
        val dataSource = JsonDataSource(parsedOptions)

        val alteredSchema = GeoJSONUtils.updateGeometrySchema(schema, StringType)

        val parser = SparkCompatUtil.constructJacksonParser(
          alteredSchema,
          parsedOptions,
          allowArrayAsStructs = true)

        val rows = SparkCompatUtil
          .readFile(
            dataSource,
            new Configuration(),
            createPartitionedFile(currentFile),
            parser,
            schema)

        rows.map(row => {
          val geometryConvertedRow = GeoJSONUtils.convertGeoJsonToGeometry(row, alteredSchema)
          val propertiesPromotedRow = promotePropertiesToTop(geometryConvertedRow, alteredSchema)
          propertiesPromotedRow
        })
      } else {
        Iterator.empty
      }

      next()
    } else {
      false
    }
  }