def collectItemLinks()

in spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/stac/StacBatch.scala [145:269]


  def collectItemLinks(
      collectionBasePath: String,
      collectionJson: String,
      itemLinks: scala.collection.mutable.ArrayBuffer[String],
      needCountNextItems: Boolean): Unit = {

    // end early if there are no more items to process
    if (needCountNextItems && itemMaxLeft <= 0) return

    if (itemLinks.size - lastReportCount >= itemsLoadProcessReportThreshold) {
      Console.out.println(s"Searched or partitioned ${itemLinks.size} items so far.")
      lastReportCount = itemLinks.size
    }

    // Parse the JSON string into a JsonNode (tree representation of JSON)
    val rootNode: JsonNode = mapper.readTree(collectionJson)

    // Extract item links from the "links" array
    val linksNode = rootNode.get("links")
    val iterator = linksNode.elements()

    def iterateItemsWithLimit(itemUrl: String, needCountNextItems: Boolean): Boolean = {
      // Load the item URL and process the response
      var nextUrl: Option[String] = Some(itemUrl)
      breakable {
        while (nextUrl.isDefined) {
          val itemJson = StacUtils.loadStacCollectionToJson(nextUrl.get)
          val itemRootNode = mapper.readTree(itemJson)
          // Check if there exists a "next" link
          val itemLinksNode = itemRootNode.get("links")
          if (itemLinksNode == null) {
            return true
          }
          val itemIterator = itemLinksNode.elements()
          nextUrl = None
          while (itemIterator.hasNext) {
            val itemLinkNode = itemIterator.next()
            val itemRel = itemLinkNode.get("rel").asText()
            val itemHref = itemLinkNode.get("href").asText()
            if (itemRel == "next") {
              // Only check the number of items returned if there are more items to process
              val numberReturnedNode = itemRootNode.get("numberReturned")
              val numberReturned = if (numberReturnedNode == null) {
                // From STAC API Spec:
                // The optional limit parameter limits the number of
                // items that are presented in the response document.
                // The default value is 10.
                defaultItemsLimitPerRequest
              } else {
                numberReturnedNode.asInt()
              }
              // count the number of items returned and left to be processed
              itemMaxLeft = itemMaxLeft - numberReturned
              // early exit if there are no more items to process
              if (needCountNextItems && itemMaxLeft <= 0) {
                return true
              }
              nextUrl = Some(if (itemHref.startsWith("http") || itemHref.startsWith("file")) {
                itemHref
              } else {
                collectionBasePath + itemHref
              })
            }
          }
          if (nextUrl.isDefined) {
            itemLinks += nextUrl.get
          }
        }
      }
      false
    }

    while (iterator.hasNext) {
      val linkNode = iterator.next()
      val rel = linkNode.get("rel").asText()
      val href = linkNode.get("href").asText()

      // item links are identified by the "rel" value of "item" or "items"
      if (rel == "item" || rel == "items") {
        // need to handle relative paths and local file paths
        val itemUrl = if (href.startsWith("http") || href.startsWith("file")) {
          href
        } else {
          collectionBasePath + href
        }
        if (rel == "items" && href.startsWith("http")) {
          itemLinks += (itemUrl + "?limit=" + defaultItemsLimitPerRequest)
        } else {
          itemLinks += itemUrl
        }
        if (needCountNextItems && itemMaxLeft <= 0) {
          return
        } else {
          if (rel == "item" && needCountNextItems) {
            // count the number of items returned and left to be processed
            itemMaxLeft = itemMaxLeft - 1
          } else if (rel == "items" && href.startsWith("http")) {
            // iterate through the items and check if the limit is reached (if needed)
            if (iterateItemsWithLimit(
                getItemLink(itemUrl, defaultItemsLimitPerRequest, spatialFilter, temporalFilter),
                needCountNextItems)) return
          }
        }
      } else if (rel == "child") {
        val childUrl = if (href.startsWith("http") || href.startsWith("file")) {
          href
        } else {
          collectionBasePath + href
        }
        // Recursively process the linked collection
        val linkedCollectionJson = StacUtils.loadStacCollectionToJson(childUrl)
        val nestedCollectionBasePath = StacUtils.getStacCollectionBasePath(childUrl)
        val collectionFiltered =
          filterCollection(linkedCollectionJson, spatialFilter, temporalFilter)

        if (!collectionFiltered) {
          collectItemLinks(
            nestedCollectionBasePath,
            linkedCollectionJson,
            itemLinks,
            needCountNextItems)
        }
      }
    }
  }