boolean reschedule()

in src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java [125:187]


    boolean reschedule(ScheduledJobParameter jobParameter, JobSchedulingInfo jobInfo, ScheduledJobRunner jobRunner,
                       JobDocVersion version, Double jitterLimit) {
        if (jobParameter.getEnabledTime() == null) {
            log.info("There is no enable time of job {}, this job should never be scheduled.",
                    jobParameter.getName());
            return false;
        }

        Instant nextExecutionTime = jobParameter.getSchedule().getNextExecutionTime(jobInfo.getExpectedExecutionTime());
        if (nextExecutionTime == null) {
            log.info("No next execution time for job {}", jobParameter.getName());
            return true;
        }
        Duration duration = Duration.between(this.clock.instant(), nextExecutionTime);

        // Too many jobs start at the same time point will bring burst. Add random jitter delay to spread out load.
        // Example, if interval is 10 minutes, jitter is 0.6, next job run will be randomly delayed by 0 to 10*0.6 minutes.
        Instant secondExecutionTimeFromNow = jobParameter.getSchedule().getNextExecutionTime(nextExecutionTime);
        if (secondExecutionTimeFromNow != null) {
            Duration interval = Duration.between(nextExecutionTime, secondExecutionTimeFromNow);
            if (interval.toMillis() > 0) {
                double jitter = jobParameter.getJitter() == null ? 0d : jobParameter.getJitter();
                jitter = jitter > jitterLimit ? jitterLimit : jitter;
                jitter = jitter < 0 ? 0 : jitter;
                long randomLong = Randomness.get().nextLong();
                if (randomLong == Long.MIN_VALUE) randomLong += 17; // to ensure the * -1 below doesn't fail to change to positive
                long randomPositiveLong = randomLong < 0 ? randomLong * -1 : randomLong;
                long jitterMillis = Math.round(randomPositiveLong % interval.toMillis() * jitter);
                if (jitter > 0) {
                    log.info("Will delay {} miliseconds for next execution of job {}", jitterMillis, jobParameter.getName());
                }
                duration = duration.plusMillis(jitterMillis);
            }
        }

        jobInfo.setExpectedExecutionTime(nextExecutionTime);

        Runnable runnable = () -> {
            if (jobInfo.isDescheduled()) {
                return;
            }

            jobInfo.setExpectedPreviousExecutionTime(jobInfo.getExpectedExecutionTime());
            jobInfo.setActualPreviousExecutionTime(clock.instant());
            // schedule next execution
            this.reschedule(jobParameter, jobInfo, jobRunner, version, jitterLimit);

            // invoke job runner
            JobExecutionContext context = new JobExecutionContext(jobInfo.getExpectedPreviousExecutionTime(), version, lockService,
                jobInfo.getIndexName(), jobInfo.getJobId());

            jobRunner.runJob(jobParameter, context);
        };

        if (jobInfo.isDescheduled()) {
            return false;
        }

        jobInfo.setScheduledCancellable(this.threadPool.schedule(runnable, new TimeValue(duration.toNanos(),
                        TimeUnit.NANOSECONDS), JobSchedulerPlugin.OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME));

        return true;
    }