in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt [70:148]
suspend fun moveMetadata() {
if (runningLock) {
logger.info("There is a move metadata process running...")
return
} else if (finishFlag) {
logger.info("Move metadata has finished.")
return
}
try {
runningLock = true
if (skipExecution.flag) {
logger.info("Cluster still has nodes running old version of ISM plugin, skip ping execution on new nodes until all nodes upgraded")
runningLock = false
return
}
if (runTimeCounter >= maxRunTime) {
updateStatusSetting(-1)
finishFlag = true; runningLock = false; runTimeCounter = 0
return
}
logger.info("Doing metadata migration ${++runTimeCounter} time.")
val indicesMetadata = clusterService.state().metadata.indices
var clusterStateManagedIndexMetadata = indicesMetadata.map {
it.key to it.value.getManagedIndexMetadata()
}.filter { it.second != null }.distinct().toMap()
// filter out previous failedToClean indices which already been indexed
clusterStateManagedIndexMetadata =
clusterStateManagedIndexMetadata.filter { it.key !in failedToCleanIndices.map { index -> index.name } }
val indexUuidMap = clusterStateManagedIndexMetadata.map { indicesMetadata[it.key].indexUUID to it.key }.toMap()
if (clusterStateManagedIndexMetadata.isEmpty()) {
if (failedToCleanIndices.isNotEmpty()) {
logger.info("Failed to clean indices: $failedToCleanIndices. Only clean cluster state metadata in this run.")
cleanMetadatas(failedToCleanIndices.toList())
finishFlag = false; runningLock = false
return
}
if (counter++ > 2) {
logger.info("Move Metadata succeed, set finish flag to true. Indices failed to get indexed: $failedToIndexIndices")
updateStatusSetting(1)
finishFlag = true; runningLock = false
return
}
} else {
counter = 0; finishFlag = false // index metadata for indices which metadata hasn't been indexed
val bulkIndexReq =
clusterStateManagedIndexMetadata.mapNotNull { it.value }.map {
managedIndexMetadataIndexRequest(
it,
waitRefresh = false, // should be set at bulk request level
create = true // restrict this as create operation
)
}
// remove the part which gonna be indexed from last time failedToIndex
failedToIndexIndices = failedToIndexIndices.filterKeys { it !in indexUuidMap.keys }.toMutableMap()
successfullyIndexedIndices.clear()
indexMetadatas(bulkIndexReq)
logger.info("success indexed: ${successfullyIndexedIndices.map { indexUuidMap[it] }}")
logger.info(
"failed indexed: ${failedToIndexIndices.map { indexUuidMap[it.key] }};" +
"failed reason: ${failedToIndexIndices.values.distinct()}"
)
}
// clean metadata for indices which metadata already been indexed
val indicesToCleanMetadata =
indexUuidMap.filter { it.key in successfullyIndexedIndices }.map { Index(it.value, it.key) }
.toList() + failedToCleanIndices
cleanMetadatas(indicesToCleanMetadata)
logger.info("Failed to clean cluster metadata for: ${failedToCleanIndices.map { it.name }}")
} finally {
runningLock = false
}
}