override fun runJob()

in src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt [135:185]


    override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
        if (job !is Rollup) {
            throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}")
        }

        launch {
            var metadata: RollupMetadata? = null
            try {
                // Get Metadata does a get request to the config index which the role will not have access to. This is an internal
                // call used by the plugin to populate the metadata itself so do not run this with role's context
                if (job.metadataID != null) {
                    metadata = when (val getMetadataResult = rollupMetadataService.getExistingMetadata(job)) {
                        is MetadataResult.Success -> getMetadataResult.metadata
                        is MetadataResult.NoMetadata -> null
                        is MetadataResult.Failure ->
                            throw RollupMetadataException("Failed to get existing rollup metadata [${job.metadataID}]", getMetadataResult.cause)
                    }
                }
            } catch (e: RollupMetadataException) {
                // If the metadata was not able to be retrieved, the exception will be logged and the job run will be a no-op
                logger.error(e.message, e.cause)
                return@launch
            }

            // Check if rollup should be processed before acquiring the lock
            // If metadata does not exist, it will either be initialized for the first time or it will be recreated to communicate the failed state
            if (rollupSearchService.shouldProcessRollup(job, metadata)) {
                val lock = acquireLockForScheduledJob(job, context, backoffPolicy)
                if (lock == null) {
                    logger.debug("Could not acquire lock for ${job.id}")
                } else {
                    runRollupJob(job, context, lock)
                    releaseLockForScheduledJob(context, lock)
                }
            } else if (job.isEnabled) {
                // We are doing this outside of ShouldProcess as schedule job interval can be more frequent than rollup and we want to fail
                // validation as soon as possible
                when (val jobValidity = isJobValid(job)) {
                    is RollupJobValidationResult.Invalid -> {
                        val lock = acquireLockForScheduledJob(job, context, backoffPolicy)
                        if (lock != null) {
                            setFailedMetadataAndDisableJob(job, jobValidity.reason)
                            logger.info("updating metadata service to disable the job [${job.id}]")
                            releaseLockForScheduledJob(context, lock)
                        }
                    }
                    else -> {}
                }
            }
        }
    }