override fun masterOperation()

in src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt [58:130]


    override fun masterOperation(
        request: UpdateRollupMappingRequest,
        state: ClusterState,
        listener: ActionListener<AcknowledgedResponse>
    ) {
        val index = state.metadata.index(request.rollup.targetIndex)
        if (index == null) {
            log.debug("Could not find index [$index]")
            return listener.onFailure(IllegalStateException("Could not find index [$index]"))
        }
        val mappings = index.mapping()
        if (mappings == null) {
            log.debug("Could not find mapping for index [$index]")
            return listener.onFailure(IllegalStateException("Could not find mapping for index [$index]"))
        }
        val source = mappings.sourceAsMap
        if (source == null) {
            log.debug("Could not find source for index mapping [$index]")
            return listener.onFailure(IllegalStateException("Could not find source for index mapping [$index]"))
        }

        val rollup = XContentHelper.convertToMap(
            BytesReference.bytes(request.rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)),
            false,
            XContentType.JSON
        ).v2()
        val metaMappings = mutableMapOf<String, Any>()
        // TODO: Clean this up
        val meta = source[_META]
        if (meta == null) {
            // TODO: Is schema_version always present?
            log.debug("Could not find meta mappings for index [$index], creating meta mappings")
            val rollupJobEntries = mapOf<String, Any>(request.rollup.id to rollup)
            val rollups = mapOf<String, Any>("rollups" to rollupJobEntries)
            metaMappings[_META] = rollups
        } else {
            val rollups = (meta as Map<*, *>)["rollups"]
            if (rollups == null) {
                log.debug("Could not find meta rollup mappings for index [$index], creating meta rollup mappings")
                val rollupJobEntries = mapOf<String, Any>(request.rollup.id to rollup)
                val updatedRollups = mapOf<String, Any>("rollups" to rollupJobEntries)
                metaMappings[_META] = updatedRollups
            } else {
                if ((rollups as Map<*, *>).containsKey(request.rollup.id)) {
                    log.debug("Meta rollup mappings already contain rollup ${request.rollup.id} for index [$index]")
                    return listener.onFailure(
                        IllegalStateException("Meta rollup mappings already contain rollup ${request.rollup.id} for index [$index]")
                    )
                }

                // In this case rollup mappings exists and there is no entry for request.rollup.id
                val rollupJobEntries = rollups.toMutableMap()
                rollupJobEntries[request.rollup.id] = rollup
                val updatedRollups = mapOf<String, Any>("rollups" to rollupJobEntries)
                metaMappings[_META] = updatedRollups
            }
        }

        // TODO: Does schema_version get overwritten?
        val putMappingRequest = PutMappingRequest(request.rollup.targetIndex).type(_DOC).source(metaMappings)
        client.admin().indices().putMapping(
            putMappingRequest,
            object : ActionListener<AcknowledgedResponse> {
                override fun onResponse(response: AcknowledgedResponse) {
                    listener.onResponse(response)
                }

                override fun onFailure(e: Exception) {
                    listener.onFailure(e)
                }
            }
        )
    }