in server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTaskExecutor.java [81:129]
private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long delayMillis, long execCount)
{
long actualDelayMillis = delayMillis - priorExecDurationMillis;
AtomicBoolean runNow = new AtomicBoolean(actualDelayMillis <= 0);
timerIds.compute(key, (k, tid) -> {
// The periodic task has been scheduled already. Exit early and avoid scheduling the duplication
if (tid != null && execCount == 0)
{
LOGGER.debug("Task is already scheduled. task='{}'", key);
runNow.set(false);
return tid;
}
// Cleanup the unscheduled task from map
if (tid != null && tid == UNSCHEDULED_STATE_TIMER_ID) // at this step, execCount != 0
{
LOGGER.debug("Task is now unscheduled. task='{}' execCount={}", key, execCount);
runNow.set(false);
return null; // remove the entry from the map, since it is unscheduled
}
if (tid == null && execCount != 0)
{
LOGGER.info("The executor is closed or the task is already unscheduled. " +
"Avoid scheduling more runs." +
"tid=null task='{}' execCount={}", key, execCount);
runNow.set(false);
return null;
}
LOGGER.debug("Scheduling task {}. task='{}' execCount={}",
runNow.get() ? "immediately" : "in " + actualDelayMillis + " milliseconds",
key, execCount);
// If run immediately, do not execute within the compute block.
// Return the placeholder timer ID, and execute after exiting the compute block.
if (runNow.get())
{
return RUN_NOW_TIMER_ID; // use the placeholder ID, since this run is not scheduled as a timer
}
// Schedule and update the timer id
return internalPool.setTimer(delayMillis, timerId -> executeAndScheduleNext(key, execCount));
});
if (runNow.get())
{
executeAndScheduleNext(key, execCount);
}
}