fun indexNewPackages()

in app/src/main/kotlin/io/klibs/app/indexing/PackageIndexingService.kt [58:96]


    fun indexNewPackages() {
        logger.info("=== Starting scheduled packages indexing job ===")
        runBlocking {
            val errorChannel = Channel<Exception>(Channel.BUFFERED)
            try {
                val errorFlow = errorChannel.receiveAsFlow()
                launch {
                    errorFlow.collect { error ->
                        logger.error("Indexing error: ${error.message}", error.cause)
                    }
                    logger.info("=== Packages indexing: error flow completed ===")
                }

                supervisorScope {
                    discoverers.map { discoverer ->
                        launch(Dispatchers.IO) {
                            discoverer.discover(errorChannel = errorChannel)
                                .buffer()  // Allows the flow to emit faster than a collection
                                .chunked(size = 5)
                                .collect { newArtifacts ->
                                    if (newArtifacts.isNotEmpty()) {
                                        val indexRequests = newArtifacts.map { it.toIndexRequest() }
                                        val insertedRequests = indexingRequestRepository.saveAll(indexRequests).count()
                                        val removedRepeating = indexingRequestRepository.removeRepeating()
                                        logger.debug("Queued up ${insertedRequests - removedRepeating} newArtifacts")
                                    }
                                }
                        }
                    }.joinAll()
                }
            } catch (ex: Exception) {
                logger.error("Unable to process all packages for indexing", ex)
            } finally {
                errorChannel.close()
            }

        }
        logger.info("=== Finished scheduled packages indexing job ===")
    }