suspend fun executeCompositeSearch()

in src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt [95:135]


    suspend fun executeCompositeSearch(job: Rollup, metadata: RollupMetadata): RollupSearchResult {
        return try {
            var retryCount = 0
            RollupSearchResult.Success(
                retrySearchPolicy.retry(logger) {
                    val decay = 2f.pow(retryCount++)
                    client.suspendUntil { listener: ActionListener<SearchResponse> ->
                        val pageSize = max(1, job.pageSize.div(decay.toInt()))
                        if (decay > 1) logger.warn(
                            "Composite search failed for rollup, retrying [#${retryCount - 1}] -" +
                                " reducing page size of composite aggregation from ${job.pageSize} to $pageSize"
                        )
                        search(job.copy(pageSize = pageSize).getRollupSearchRequest(metadata), listener)
                    }
                }
            )
        } catch (e: SearchPhaseExecutionException) {
            logger.error(e.message, e.cause)
            if (e.shardFailures().isEmpty()) {
                RollupSearchResult.Failure(cause = ExceptionsHelper.unwrapCause(e) as Exception)
            } else {
                val shardFailure = e.shardFailures().reduce { s1, s2 -> if (s1.status().status > s2.status().status) s1 else s2 }
                RollupSearchResult.Failure(cause = ExceptionsHelper.unwrapCause(shardFailure.cause) as Exception)
            }
        } catch (e: RemoteTransportException) {
            logger.error(e.message, e.cause)
            RollupSearchResult.Failure(cause = ExceptionsHelper.unwrapCause(e) as Exception)
        } catch (e: CircuitBreakingException) {
            logger.error(e.message, e.cause)
            RollupSearchResult.Failure(cause = e)
        } catch (e: MultiBucketConsumerService.TooManyBucketsException) {
            logger.error(e.message, e.cause)
            RollupSearchResult.Failure(cause = e)
        } catch (e: OpenSearchSecurityException) {
            logger.error(e.message, e.cause)
            RollupSearchResult.Failure("Cannot search data in source index/s - missing required index permissions: ${e.localizedMessage}", e)
        } catch (e: Exception) {
            logger.error(e.message, e.cause)
            RollupSearchResult.Failure(cause = e)
        }
    }