in src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt [95:135]
suspend fun executeCompositeSearch(job: Rollup, metadata: RollupMetadata): RollupSearchResult {
return try {
var retryCount = 0
RollupSearchResult.Success(
retrySearchPolicy.retry(logger) {
val decay = 2f.pow(retryCount++)
client.suspendUntil { listener: ActionListener<SearchResponse> ->
val pageSize = max(1, job.pageSize.div(decay.toInt()))
if (decay > 1) logger.warn(
"Composite search failed for rollup, retrying [#${retryCount - 1}] -" +
" reducing page size of composite aggregation from ${job.pageSize} to $pageSize"
)
search(job.copy(pageSize = pageSize).getRollupSearchRequest(metadata), listener)
}
}
)
} catch (e: SearchPhaseExecutionException) {
logger.error(e.message, e.cause)
if (e.shardFailures().isEmpty()) {
RollupSearchResult.Failure(cause = ExceptionsHelper.unwrapCause(e) as Exception)
} else {
val shardFailure = e.shardFailures().reduce { s1, s2 -> if (s1.status().status > s2.status().status) s1 else s2 }
RollupSearchResult.Failure(cause = ExceptionsHelper.unwrapCause(shardFailure.cause) as Exception)
}
} catch (e: RemoteTransportException) {
logger.error(e.message, e.cause)
RollupSearchResult.Failure(cause = ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: CircuitBreakingException) {
logger.error(e.message, e.cause)
RollupSearchResult.Failure(cause = e)
} catch (e: MultiBucketConsumerService.TooManyBucketsException) {
logger.error(e.message, e.cause)
RollupSearchResult.Failure(cause = e)
} catch (e: OpenSearchSecurityException) {
logger.error(e.message, e.cause)
RollupSearchResult.Failure("Cannot search data in source index/s - missing required index permissions: ${e.localizedMessage}", e)
} catch (e: Exception) {
logger.error(e.message, e.cause)
RollupSearchResult.Failure(cause = e)
}
}