in src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java [125:187]
boolean reschedule(ScheduledJobParameter jobParameter, JobSchedulingInfo jobInfo, ScheduledJobRunner jobRunner,
JobDocVersion version, Double jitterLimit) {
if (jobParameter.getEnabledTime() == null) {
log.info("There is no enable time of job {}, this job should never be scheduled.",
jobParameter.getName());
return false;
}
Instant nextExecutionTime = jobParameter.getSchedule().getNextExecutionTime(jobInfo.getExpectedExecutionTime());
if (nextExecutionTime == null) {
log.info("No next execution time for job {}", jobParameter.getName());
return true;
}
Duration duration = Duration.between(this.clock.instant(), nextExecutionTime);
// Too many jobs start at the same time point will bring burst. Add random jitter delay to spread out load.
// Example, if interval is 10 minutes, jitter is 0.6, next job run will be randomly delayed by 0 to 10*0.6 minutes.
Instant secondExecutionTimeFromNow = jobParameter.getSchedule().getNextExecutionTime(nextExecutionTime);
if (secondExecutionTimeFromNow != null) {
Duration interval = Duration.between(nextExecutionTime, secondExecutionTimeFromNow);
if (interval.toMillis() > 0) {
double jitter = jobParameter.getJitter() == null ? 0d : jobParameter.getJitter();
jitter = jitter > jitterLimit ? jitterLimit : jitter;
jitter = jitter < 0 ? 0 : jitter;
long randomLong = Randomness.get().nextLong();
if (randomLong == Long.MIN_VALUE) randomLong += 17; // to ensure the * -1 below doesn't fail to change to positive
long randomPositiveLong = randomLong < 0 ? randomLong * -1 : randomLong;
long jitterMillis = Math.round(randomPositiveLong % interval.toMillis() * jitter);
if (jitter > 0) {
log.info("Will delay {} miliseconds for next execution of job {}", jitterMillis, jobParameter.getName());
}
duration = duration.plusMillis(jitterMillis);
}
}
jobInfo.setExpectedExecutionTime(nextExecutionTime);
Runnable runnable = () -> {
if (jobInfo.isDescheduled()) {
return;
}
jobInfo.setExpectedPreviousExecutionTime(jobInfo.getExpectedExecutionTime());
jobInfo.setActualPreviousExecutionTime(clock.instant());
// schedule next execution
this.reschedule(jobParameter, jobInfo, jobRunner, version, jitterLimit);
// invoke job runner
JobExecutionContext context = new JobExecutionContext(jobInfo.getExpectedPreviousExecutionTime(), version, lockService,
jobInfo.getIndexName(), jobInfo.getJobId());
jobRunner.runJob(jobParameter, context);
};
if (jobInfo.isDescheduled()) {
return false;
}
jobInfo.setScheduledCancellable(this.threadPool.schedule(runnable, new TimeValue(duration.toNanos(),
TimeUnit.NANOSECONDS), JobSchedulerPlugin.OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME));
return true;
}