override fun messageReceived()

in src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt [78:119]


            override fun messageReceived(request: T, channel: TransportChannel, task: Task) {
                if (searchEnabled && request is ShardSearchRequest) {
                    val index = request.shardId().indexName
                    val isRollupIndex = isRollupIndex(index, clusterService.state())
                    if (isRollupIndex) {
                        if (request.source().size() != 0) {
                            throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}")
                        }

                        val indices = request.indices().map { it.toString() }.toTypedArray()
                        val concreteIndices = indexNameExpressionResolver
                            .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)

                        if (concreteIndices.size > 1) {
                            logger.warn(
                                "There can be only one index in search request if its a rollup search - requested to search [${concreteIndices
                                    .size}] indices including rollup index [$index]"
                            )
                            throw IllegalArgumentException("Searching rollup index with other indices is not supported currently")
                        }

                        val rollupJobs = clusterService.state().metadata.index(index).getRollupJobs()
                            ?: throw IllegalArgumentException("Could not find any valid rollup job on the index")

                        val queryFieldMappings = getQueryMetadata(request.source().query())
                        val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories)
                        val fieldMappings = queryFieldMappings + aggregationFieldMappings

                        val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)

                        if (matchingRollupJobs.isEmpty()) {
                            throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
                        }

                        // only rebuild if there is necessity to rebuild
                        if (fieldMappings.isNotEmpty()) {
                            rewriteShardSearchForRollupJobs(request, matchingRollupJobs)
                        }
                    }
                }
                actualHandler.messageReceived(request, channel, task)
            }