in src/main/kotlin/org/opensearch/indexmanagement/rollup/actionfilter/FieldCapsFilter.kt [50:122]
override fun <Request : ActionRequest?, Response : ActionResponse?> apply(
task: Task,
action: String,
request: Request,
listener: ActionListener<Response>,
chain: ActionFilterChain<Request, Response>
) {
if (request is FieldCapabilitiesRequest && shouldIntercept) {
val indices = request.indices().map { it.toString() }.toTypedArray()
val rollupIndices = mutableSetOf<String>()
val nonRollupIndices = mutableSetOf<String>()
val remoteClusterIndices = GuiceHolder.remoteClusterService.groupIndices(request.indicesOptions(), indices) {
idx: String? ->
indexNameExpressionResolver.hasIndexAbstraction(idx, clusterService.state())
}
val localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
localIndices?.let {
val concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), it)
for (index in concreteIndices) {
val isRollupIndex = isRollupIndex(index, clusterService.state())
if (isRollupIndex) {
rollupIndices.add(index)
} else {
nonRollupIndices.add(index)
}
}
}
remoteClusterIndices.entries.forEach {
val cluster = it.key
val clusterIndices = it.value
clusterIndices.indices().forEach { index ->
nonRollupIndices.add("$cluster${RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR}$index")
}
}
logger.debug("Resolved into rollup $rollupIndices and non rollup $nonRollupIndices indices")
if (rollupIndices.isEmpty()) {
return chain.proceed(task, action, request, listener)
}
/**
* The request can be one of two cases:
* 1 Just rollup indices
* 2 Rollup + NonRollup indices
* If 1 we forward the request to chain and discard the whole response from chain when rewriting.
* If 2 we forward the request to chain with only non rollup indices and append rollup data to response when rewriting.
* We are calling with rollup indices in 1 instead of an empty request since empty is defaulted to returning all indices in cluster.
**/
if (nonRollupIndices.isNotEmpty()) {
request.indices(*nonRollupIndices.toTypedArray())
}
chain.proceed(
task, action, request,
object : ActionListener<Response> {
override fun onResponse(response: Response) {
logger.info("Has rollup indices will rewrite field caps response")
response as FieldCapabilitiesResponse
val rewrittenResponse = rewriteResponse(response, rollupIndices, nonRollupIndices.isEmpty())
listener.onResponse(rewrittenResponse as Response)
}
override fun onFailure(e: Exception) {
listener.onFailure(e)
}
}
)
} else {
chain.proceed(task, action, request, listener)
}
}