in core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt [161:180]
override fun postIndex(shardId: ShardId, index: Engine.Index, result: Engine.IndexResult) {
if (!isSweepingEnabled()) return
if (result.resultType != Engine.Result.Type.SUCCESS) {
val shardJobs = sweptJobs[shardId] ?: emptyMap<JobId, JobVersion>()
val currentVersion = shardJobs[index.id()] ?: Versions.NOT_FOUND
logger.debug("Indexing failed for ScheduledJob: ${index.id()}. Continuing with current version $currentVersion")
return
}
if (isOwningNode(shardId, index.id())) {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, index.source(), XContentType.JSON)
if (isSweepableJobType(xcp)) {
val job = parseAndSweepJob(xcp, shardId, index.id(), result.version, index.source(), true)
if (job != null) scheduler.postIndex(job)
} else {
logger.debug("Not a valid job type in document ${index.id()} to sweep.")
}
}
}