in spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/stac/StacBatch.scala [145:269]
def collectItemLinks(
collectionBasePath: String,
collectionJson: String,
itemLinks: scala.collection.mutable.ArrayBuffer[String],
needCountNextItems: Boolean): Unit = {
// end early if there are no more items to process
if (needCountNextItems && itemMaxLeft <= 0) return
if (itemLinks.size - lastReportCount >= itemsLoadProcessReportThreshold) {
Console.out.println(s"Searched or partitioned ${itemLinks.size} items so far.")
lastReportCount = itemLinks.size
}
// Parse the JSON string into a JsonNode (tree representation of JSON)
val rootNode: JsonNode = mapper.readTree(collectionJson)
// Extract item links from the "links" array
val linksNode = rootNode.get("links")
val iterator = linksNode.elements()
def iterateItemsWithLimit(itemUrl: String, needCountNextItems: Boolean): Boolean = {
// Load the item URL and process the response
var nextUrl: Option[String] = Some(itemUrl)
breakable {
while (nextUrl.isDefined) {
val itemJson = StacUtils.loadStacCollectionToJson(nextUrl.get)
val itemRootNode = mapper.readTree(itemJson)
// Check if there exists a "next" link
val itemLinksNode = itemRootNode.get("links")
if (itemLinksNode == null) {
return true
}
val itemIterator = itemLinksNode.elements()
nextUrl = None
while (itemIterator.hasNext) {
val itemLinkNode = itemIterator.next()
val itemRel = itemLinkNode.get("rel").asText()
val itemHref = itemLinkNode.get("href").asText()
if (itemRel == "next") {
// Only check the number of items returned if there are more items to process
val numberReturnedNode = itemRootNode.get("numberReturned")
val numberReturned = if (numberReturnedNode == null) {
// From STAC API Spec:
// The optional limit parameter limits the number of
// items that are presented in the response document.
// The default value is 10.
defaultItemsLimitPerRequest
} else {
numberReturnedNode.asInt()
}
// count the number of items returned and left to be processed
itemMaxLeft = itemMaxLeft - numberReturned
// early exit if there are no more items to process
if (needCountNextItems && itemMaxLeft <= 0) {
return true
}
nextUrl = Some(if (itemHref.startsWith("http") || itemHref.startsWith("file")) {
itemHref
} else {
collectionBasePath + itemHref
})
}
}
if (nextUrl.isDefined) {
itemLinks += nextUrl.get
}
}
}
false
}
while (iterator.hasNext) {
val linkNode = iterator.next()
val rel = linkNode.get("rel").asText()
val href = linkNode.get("href").asText()
// item links are identified by the "rel" value of "item" or "items"
if (rel == "item" || rel == "items") {
// need to handle relative paths and local file paths
val itemUrl = if (href.startsWith("http") || href.startsWith("file")) {
href
} else {
collectionBasePath + href
}
if (rel == "items" && href.startsWith("http")) {
itemLinks += (itemUrl + "?limit=" + defaultItemsLimitPerRequest)
} else {
itemLinks += itemUrl
}
if (needCountNextItems && itemMaxLeft <= 0) {
return
} else {
if (rel == "item" && needCountNextItems) {
// count the number of items returned and left to be processed
itemMaxLeft = itemMaxLeft - 1
} else if (rel == "items" && href.startsWith("http")) {
// iterate through the items and check if the limit is reached (if needed)
if (iterateItemsWithLimit(
getItemLink(itemUrl, defaultItemsLimitPerRequest, spatialFilter, temporalFilter),
needCountNextItems)) return
}
}
} else if (rel == "child") {
val childUrl = if (href.startsWith("http") || href.startsWith("file")) {
href
} else {
collectionBasePath + href
}
// Recursively process the linked collection
val linkedCollectionJson = StacUtils.loadStacCollectionToJson(childUrl)
val nestedCollectionBasePath = StacUtils.getStacCollectionBasePath(childUrl)
val collectionFiltered =
filterCollection(linkedCollectionJson, spatialFilter, temporalFilter)
if (!collectionFiltered) {
collectItemLinks(
nestedCollectionBasePath,
linkedCollectionJson,
itemLinks,
needCountNextItems)
}
}
}
}