def filterCollection()

in spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/stac/StacBatch.scala [294:376]


  def filterCollection(
      collectionJson: String,
      spatialFilter: Option[GeoParquetSpatialFilter],
      temporalFilter: Option[TemporalFilter]): Boolean = {

    val mapper = new ObjectMapper()
    val rootNode: JsonNode = mapper.readTree(collectionJson)

    // Filter based on spatial extent
    val spatialFiltered = spatialFilter match {
      case Some(filter) =>
        val extentNode = rootNode.path("extent").path("spatial").path("bbox")
        if (extentNode.isMissingNode) {
          false
        } else {
          val bbox = extentNode
            .elements()
            .asScala
            .map { bboxNode =>
              val minX = bboxNode.get(0).asDouble()
              val minY = bboxNode.get(1).asDouble()
              val maxX = bboxNode.get(2).asDouble()
              val maxY = bboxNode.get(3).asDouble()
              (minX, minY, maxX, maxY)
            }
            .toList

          !bbox.exists { case (minX, minY, maxX, maxY) =>
            val geometryTypes = Seq("Polygon")
            val bbox = Seq(minX, minY, maxX, maxY)

            val geometryFieldMetaData = GeometryFieldMetaData(
              encoding = "WKB",
              geometryTypes = geometryTypes,
              bbox = bbox,
              crs = None,
              covering = None)

            filter.evaluate(Map("geometry" -> geometryFieldMetaData))
          }
        }
      case None => false
    }

    // Filter based on temporal extent
    val temporalFiltered = temporalFilter match {
      case Some(filter) =>
        val extentNode = rootNode.path("extent").path("temporal").path("interval")
        if (extentNode.isMissingNode) {
          // if extent is missing, we assume the collection is not filtered
          true
        } else {
          // parse the temporal intervals
          val formatter = new DateTimeFormatterBuilder()
            .appendPattern("yyyy-MM-dd'T'HH:mm:ss")
            .optionalStart()
            .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 3, true)
            .optionalEnd()
            .appendPattern("'Z'")
            .toFormatter()

          val intervals = extentNode
            .elements()
            .asScala
            .map { intervalNode =>
              val start = LocalDateTime.parse(intervalNode.get(0).asText(), formatter)
              val end = LocalDateTime.parse(intervalNode.get(1).asText(), formatter)
              (start, end)
            }
            .toList

          // check if the filter evaluates to true for any of the interval start or end times
          !intervals.exists { case (start, end) =>
            filter.evaluate(Map("datetime" -> start)) ||
            filter.evaluate(Map("datetime" -> end))
          }
        }
      // if the collection is not filtered, return false
      case None => false
    }

    spatialFiltered || temporalFiltered
  }