in src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt [135:185]
override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is Rollup) {
throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}")
}
launch {
var metadata: RollupMetadata? = null
try {
// Get Metadata does a get request to the config index which the role will not have access to. This is an internal
// call used by the plugin to populate the metadata itself so do not run this with role's context
if (job.metadataID != null) {
metadata = when (val getMetadataResult = rollupMetadataService.getExistingMetadata(job)) {
is MetadataResult.Success -> getMetadataResult.metadata
is MetadataResult.NoMetadata -> null
is MetadataResult.Failure ->
throw RollupMetadataException("Failed to get existing rollup metadata [${job.metadataID}]", getMetadataResult.cause)
}
}
} catch (e: RollupMetadataException) {
// If the metadata was not able to be retrieved, the exception will be logged and the job run will be a no-op
logger.error(e.message, e.cause)
return@launch
}
// Check if rollup should be processed before acquiring the lock
// If metadata does not exist, it will either be initialized for the first time or it will be recreated to communicate the failed state
if (rollupSearchService.shouldProcessRollup(job, metadata)) {
val lock = acquireLockForScheduledJob(job, context, backoffPolicy)
if (lock == null) {
logger.debug("Could not acquire lock for ${job.id}")
} else {
runRollupJob(job, context, lock)
releaseLockForScheduledJob(context, lock)
}
} else if (job.isEnabled) {
// We are doing this outside of ShouldProcess as schedule job interval can be more frequent than rollup and we want to fail
// validation as soon as possible
when (val jobValidity = isJobValid(job)) {
is RollupJobValidationResult.Invalid -> {
val lock = acquireLockForScheduledJob(job, context, backoffPolicy)
if (lock != null) {
setFailedMetadataAndDisableJob(job, jobValidity.reason)
logger.info("updating metadata service to disable the job [${job.id}]")
releaseLockForScheduledJob(context, lock)
}
}
else -> {}
}
}
}
}