in app/src/main/kotlin/io/klibs/app/indexing/PackageIndexingService.kt [58:96]
fun indexNewPackages() {
logger.info("=== Starting scheduled packages indexing job ===")
runBlocking {
val errorChannel = Channel<Exception>(Channel.BUFFERED)
try {
val errorFlow = errorChannel.receiveAsFlow()
launch {
errorFlow.collect { error ->
logger.error("Indexing error: ${error.message}", error.cause)
}
logger.info("=== Packages indexing: error flow completed ===")
}
supervisorScope {
discoverers.map { discoverer ->
launch(Dispatchers.IO) {
discoverer.discover(errorChannel = errorChannel)
.buffer() // Allows the flow to emit faster than a collection
.chunked(size = 5)
.collect { newArtifacts ->
if (newArtifacts.isNotEmpty()) {
val indexRequests = newArtifacts.map { it.toIndexRequest() }
val insertedRequests = indexingRequestRepository.saveAll(indexRequests).count()
val removedRepeating = indexingRequestRepository.removeRepeating()
logger.debug("Queued up ${insertedRequests - removedRepeating} newArtifacts")
}
}
}
}.joinAll()
}
} catch (ex: Exception) {
logger.error("Unable to process all packages for indexing", ex)
} finally {
errorChannel.close()
}
}
logger.info("=== Finished scheduled packages indexing job ===")
}