void sweep()

in src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java [225:265]


    void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersion jobDocVersion) {
        ConcurrentHashMap<String, JobDocVersion> jobVersionMap;
        if (this.sweptJobs.containsKey(shardId)) {
            jobVersionMap = this.sweptJobs.get(shardId);
        } else {
            jobVersionMap = new ConcurrentHashMap<>();
            this.sweptJobs.put(shardId, jobVersionMap);
        }
        jobVersionMap.compute(docId, (id, currentJobDocVersion) -> {
            if (jobDocVersion.compareTo(currentJobDocVersion) <= 0) {
                log.debug("Skipping job {}, new version {} <= current version {}", docId, jobDocVersion, currentJobDocVersion);
                return currentJobDocVersion;
            }

            if (this.scheduler.getScheduledJobIds(shardId.getIndexName()).contains(docId)) {
                this.scheduler.deschedule(shardId.getIndexName(), docId);
            }
            if (jobSource != null) {
                try {
                    ScheduledJobProvider provider = this.indexToProviders.get(shardId.getIndexName());
                    XContentParser parser = XContentHelper.createParser(this.xContentRegistry, LoggingDeprecationHandler.INSTANCE,
                            jobSource, XContentType.JSON);
                    ScheduledJobParameter jobParameter = provider.getJobParser().parse(parser, docId, jobDocVersion);
                    if (jobParameter == null) {
                        // allow parser to return null, which means this is not a scheduled job document.
                        return null;
                    }
                    ScheduledJobRunner jobRunner = this.indexToProviders.get(shardId.getIndexName()).getJobRunner();
                    if (jobParameter.isEnabled()) {
                        this.scheduler.schedule(shardId.getIndexName(), docId, jobParameter, jobRunner, jobDocVersion, jitterLimit);
                    }
                    return jobDocVersion;
                } catch (Exception e) {
                    log.warn("Unable to parse job {}, error message: {}", docId, e.getMessage());
                    return currentJobDocVersion;
                }
            } else {
                return null;
            }
        });
    }