in src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt [78:119]
override fun messageReceived(request: T, channel: TransportChannel, task: Task) {
if (searchEnabled && request is ShardSearchRequest) {
val index = request.shardId().indexName
val isRollupIndex = isRollupIndex(index, clusterService.state())
if (isRollupIndex) {
if (request.source().size() != 0) {
throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}")
}
val indices = request.indices().map { it.toString() }.toTypedArray()
val concreteIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)
if (concreteIndices.size > 1) {
logger.warn(
"There can be only one index in search request if its a rollup search - requested to search [${concreteIndices
.size}] indices including rollup index [$index]"
)
throw IllegalArgumentException("Searching rollup index with other indices is not supported currently")
}
val rollupJobs = clusterService.state().metadata.index(index).getRollupJobs()
?: throw IllegalArgumentException("Could not find any valid rollup job on the index")
val queryFieldMappings = getQueryMetadata(request.source().query())
val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories)
val fieldMappings = queryFieldMappings + aggregationFieldMappings
val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
if (matchingRollupJobs.isEmpty()) {
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
}
// only rebuild if there is necessity to rebuild
if (fieldMappings.isNotEmpty()) {
rewriteShardSearchForRollupJobs(request, matchingRollupJobs)
}
}
}
actualHandler.messageReceived(request, channel, task)
}