in src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java [225:265]
void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersion jobDocVersion) {
ConcurrentHashMap<String, JobDocVersion> jobVersionMap;
if (this.sweptJobs.containsKey(shardId)) {
jobVersionMap = this.sweptJobs.get(shardId);
} else {
jobVersionMap = new ConcurrentHashMap<>();
this.sweptJobs.put(shardId, jobVersionMap);
}
jobVersionMap.compute(docId, (id, currentJobDocVersion) -> {
if (jobDocVersion.compareTo(currentJobDocVersion) <= 0) {
log.debug("Skipping job {}, new version {} <= current version {}", docId, jobDocVersion, currentJobDocVersion);
return currentJobDocVersion;
}
if (this.scheduler.getScheduledJobIds(shardId.getIndexName()).contains(docId)) {
this.scheduler.deschedule(shardId.getIndexName(), docId);
}
if (jobSource != null) {
try {
ScheduledJobProvider provider = this.indexToProviders.get(shardId.getIndexName());
XContentParser parser = XContentHelper.createParser(this.xContentRegistry, LoggingDeprecationHandler.INSTANCE,
jobSource, XContentType.JSON);
ScheduledJobParameter jobParameter = provider.getJobParser().parse(parser, docId, jobDocVersion);
if (jobParameter == null) {
// allow parser to return null, which means this is not a scheduled job document.
return null;
}
ScheduledJobRunner jobRunner = this.indexToProviders.get(shardId.getIndexName()).getJobRunner();
if (jobParameter.isEnabled()) {
this.scheduler.schedule(shardId.getIndexName(), docId, jobParameter, jobRunner, jobDocVersion, jitterLimit);
}
return jobDocVersion;
} catch (Exception e) {
log.warn("Unable to parse job {}, error message: {}", docId, e.getMessage());
return currentJobDocVersion;
}
} else {
return null;
}
});
}