in src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt [58:130]
override fun masterOperation(
request: UpdateRollupMappingRequest,
state: ClusterState,
listener: ActionListener<AcknowledgedResponse>
) {
val index = state.metadata.index(request.rollup.targetIndex)
if (index == null) {
log.debug("Could not find index [$index]")
return listener.onFailure(IllegalStateException("Could not find index [$index]"))
}
val mappings = index.mapping()
if (mappings == null) {
log.debug("Could not find mapping for index [$index]")
return listener.onFailure(IllegalStateException("Could not find mapping for index [$index]"))
}
val source = mappings.sourceAsMap
if (source == null) {
log.debug("Could not find source for index mapping [$index]")
return listener.onFailure(IllegalStateException("Could not find source for index mapping [$index]"))
}
val rollup = XContentHelper.convertToMap(
BytesReference.bytes(request.rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)),
false,
XContentType.JSON
).v2()
val metaMappings = mutableMapOf<String, Any>()
// TODO: Clean this up
val meta = source[_META]
if (meta == null) {
// TODO: Is schema_version always present?
log.debug("Could not find meta mappings for index [$index], creating meta mappings")
val rollupJobEntries = mapOf<String, Any>(request.rollup.id to rollup)
val rollups = mapOf<String, Any>("rollups" to rollupJobEntries)
metaMappings[_META] = rollups
} else {
val rollups = (meta as Map<*, *>)["rollups"]
if (rollups == null) {
log.debug("Could not find meta rollup mappings for index [$index], creating meta rollup mappings")
val rollupJobEntries = mapOf<String, Any>(request.rollup.id to rollup)
val updatedRollups = mapOf<String, Any>("rollups" to rollupJobEntries)
metaMappings[_META] = updatedRollups
} else {
if ((rollups as Map<*, *>).containsKey(request.rollup.id)) {
log.debug("Meta rollup mappings already contain rollup ${request.rollup.id} for index [$index]")
return listener.onFailure(
IllegalStateException("Meta rollup mappings already contain rollup ${request.rollup.id} for index [$index]")
)
}
// In this case rollup mappings exists and there is no entry for request.rollup.id
val rollupJobEntries = rollups.toMutableMap()
rollupJobEntries[request.rollup.id] = rollup
val updatedRollups = mapOf<String, Any>("rollups" to rollupJobEntries)
metaMappings[_META] = updatedRollups
}
}
// TODO: Does schema_version get overwritten?
val putMappingRequest = PutMappingRequest(request.rollup.targetIndex).type(_DOC).source(metaMappings)
client.admin().indices().putMapping(
putMappingRequest,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
listener.onResponse(response)
}
override fun onFailure(e: Exception) {
listener.onFailure(e)
}
}
)
}