in app/src/main/kotlin/io/klibs/app/indexing/discoverer/impl/CentralSonatypePackageDiscoverer.kt [41:80]
override suspend fun discover(errorChannel: Channel<Exception>): Flow<MavenArtifact> {
logger.info("--- Central sonatype packages discovering started. ---")
val timestampBeforeIndexing = Instant.now()
val existingPackages = collectAllKnownPackages()
val seenCoordinates = mutableMapOf<String, MutableList<String>>()
return centralSonatypeScraper.findKmpArtifacts(
lastPackageIndexTs.minus(Duration.ofHours(lastUpdatedOffsetHours.toLong())),
errorChannel
)
.chunked(1000)
.flatMapConcat { foundArtifactsBatch ->
collectUnknownArtifacts(foundArtifactsBatch, existingPackages).asFlow()
}
.flatMapConcat { artifact ->
val coordinates = artifact.getArtifactCoordinates()
if (!seenCoordinates.contains(coordinates)) {
seenCoordinates[coordinates] = mutableListOf()
centralSonatypeScraper.findAllVersionForArtifact(artifact, errorChannel)
} else {
emptyFlow()
}
}
// TODO KTL-2971 Indexing: `findAllVersionForArtifact` returns duplicates
.mapNotNull { artifact ->
if (seenCoordinates[artifact.getArtifactCoordinates()]?.contains(artifact.version) == true) {
null
} else {
seenCoordinates[artifact.getArtifactCoordinates()]?.add(artifact.version)
artifact
}
}
.chunked(1000)
.flatMapMerge { foundArtifactsVersionsBatch ->
collectUnknownArtifacts(foundArtifactsVersionsBatch, existingPackages).asFlow()
}.onCompletion {
lastPackageIndexTs = timestampBeforeIndexing
logger.info("--- Central sonatype packages discovering finished. Last indexing time change to $lastPackageIndexTs ---")
}
}