suspend fun moveMetadata()

in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt [70:148]


    suspend fun moveMetadata() {
        if (runningLock) {
            logger.info("There is a move metadata process running...")
            return
        } else if (finishFlag) {
            logger.info("Move metadata has finished.")
            return
        }
        try {
            runningLock = true

            if (skipExecution.flag) {
                logger.info("Cluster still has nodes running old version of ISM plugin, skip ping execution on new nodes until all nodes upgraded")
                runningLock = false
                return
            }

            if (runTimeCounter >= maxRunTime) {
                updateStatusSetting(-1)
                finishFlag = true; runningLock = false; runTimeCounter = 0
                return
            }
            logger.info("Doing metadata migration ${++runTimeCounter} time.")

            val indicesMetadata = clusterService.state().metadata.indices
            var clusterStateManagedIndexMetadata = indicesMetadata.map {
                it.key to it.value.getManagedIndexMetadata()
            }.filter { it.second != null }.distinct().toMap()
            // filter out previous failedToClean indices which already been indexed
            clusterStateManagedIndexMetadata =
                clusterStateManagedIndexMetadata.filter { it.key !in failedToCleanIndices.map { index -> index.name } }
            val indexUuidMap = clusterStateManagedIndexMetadata.map { indicesMetadata[it.key].indexUUID to it.key }.toMap()

            if (clusterStateManagedIndexMetadata.isEmpty()) {
                if (failedToCleanIndices.isNotEmpty()) {
                    logger.info("Failed to clean indices: $failedToCleanIndices. Only clean cluster state metadata in this run.")
                    cleanMetadatas(failedToCleanIndices.toList())
                    finishFlag = false; runningLock = false
                    return
                }
                if (counter++ > 2) {
                    logger.info("Move Metadata succeed, set finish flag to true. Indices failed to get indexed: $failedToIndexIndices")
                    updateStatusSetting(1)
                    finishFlag = true; runningLock = false
                    return
                }
            } else {
                counter = 0; finishFlag = false // index metadata for indices which metadata hasn't been indexed
                val bulkIndexReq =
                    clusterStateManagedIndexMetadata.mapNotNull { it.value }.map {
                        managedIndexMetadataIndexRequest(
                            it,
                            waitRefresh = false, // should be set at bulk request level
                            create = true // restrict this as create operation
                        )
                    }
                // remove the part which gonna be indexed from last time failedToIndex
                failedToIndexIndices = failedToIndexIndices.filterKeys { it !in indexUuidMap.keys }.toMutableMap()
                successfullyIndexedIndices.clear()
                indexMetadatas(bulkIndexReq)

                logger.info("success indexed: ${successfullyIndexedIndices.map { indexUuidMap[it] }}")
                logger.info(
                    "failed indexed: ${failedToIndexIndices.map { indexUuidMap[it.key] }};" +
                        "failed reason: ${failedToIndexIndices.values.distinct()}"
                )
            }

            // clean metadata for indices which metadata already been indexed
            val indicesToCleanMetadata =
                indexUuidMap.filter { it.key in successfullyIndexedIndices }.map { Index(it.value, it.key) }
                    .toList() + failedToCleanIndices

            cleanMetadatas(indicesToCleanMetadata)
            logger.info("Failed to clean cluster metadata for: ${failedToCleanIndices.map { it.name }}")
        } finally {
            runningLock = false
        }
    }