in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt [46:127]
override suspend fun execute(): AttemptRolloverStep {
val skipRollover = clusterService.state().metadata.index(indexName).getRolloverSkip()
if (skipRollover) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSkipRolloverMessage(indexName))
return this
}
val (rolloverTarget, isDataStream) = getRolloverTargetOrUpdateInfo()
// If the rolloverTarget is null, we would've already updated the failed info from getRolloverTargetOrUpdateInfo and can return early
rolloverTarget ?: return this
if (clusterService.state().metadata.index(indexName).rolloverInfos.containsKey(rolloverTarget)) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getAlreadyRolledOverMessage(indexName, rolloverTarget))
return this
}
if (!isDataStream && !preCheckIndexAlias(rolloverTarget)) {
stepStatus = StepStatus.FAILED
info = mapOf("message" to getFailedPreCheckMessage(indexName))
return this
}
val statsResponse = getIndexStatsOrUpdateInfo()
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
statsResponse ?: return this
val indexCreationDate = clusterService.state().metadata().index(indexName).creationDate
val indexAgeTimeValue = if (indexCreationDate == -1L) {
logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison")
// since we cannot use for comparison, we can set it to 0 as minAge will never be <= 0
TimeValue.timeValueMillis(0)
} else {
TimeValue.timeValueMillis(Instant.now().toEpochMilli() - indexCreationDate)
}
val numDocs = statsResponse.primaries.docs?.count ?: 0
val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)
val largestPrimaryShard = statsResponse.shards.maxByOrNull { it.stats.docs?.totalSizeInBytes ?: 0 }
val largestPrimaryShardSize = ByteSizeValue(largestPrimaryShard?.stats?.docs?.totalSizeInBytes ?: 0)
val conditions = listOfNotNull(
config.minAge?.let {
RolloverActionConfig.MIN_INDEX_AGE_FIELD to mapOf(
"condition" to it.toString(),
"current" to indexAgeTimeValue.toString(),
"creationDate" to indexCreationDate
)
},
config.minDocs?.let {
RolloverActionConfig.MIN_DOC_COUNT_FIELD to mapOf(
"condition" to it,
"current" to numDocs
)
},
config.minSize?.let {
RolloverActionConfig.MIN_SIZE_FIELD to mapOf(
"condition" to it.toString(),
"current" to indexSize.toString()
)
},
config.minPrimaryShardSize?.let {
RolloverActionConfig.MIN_PRIMARY_SHARD_SIZE_FIELD to mapOf(
"condition" to it.toString(),
"current" to largestPrimaryShardSize.toString(),
"shard" to largestPrimaryShard?.shardRouting?.id()
)
}
).toMap()
if (config.evaluateConditions(indexAgeTimeValue, numDocs, indexSize, largestPrimaryShardSize)) {
logger.info(
"$indexName rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize.bytes}, primaryShardSize=${largestPrimaryShardSize.bytes}]"
)
executeRollover(rolloverTarget, isDataStream, conditions)
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to getPendingMessage(indexName), "conditions" to conditions)
}
return this
}