in src/main/java/com/google/cloud/run/kafkascaler/SelfScheduler.java [76:113]
public void scheduleTasks(Instant now, Map<String, String> requestHeaders) {
if (config.cycleDuration().compareTo(ConfigurationProvider.MAX_CYCLE_DURATION) >= 0) {
logger.atInfo().log(
"Cycle duration is greater than or equal to max cycle duration; skipping additional task"
+ " scheduling.");
return;
}
if (scalerUrl == null) {
try {
scalerUrl = configProvider.scalerUrl(cloudRunMetadataClient.projectNumberRegion());
} catch (RuntimeException | IOException e) {
logger.atSevere().withCause(e).log("[SCHEDULER] Failed to schedule additional tasks");
// It's safe to continue here, we just can't schedule additional tasks. We'll try again on
// the next non-self-scheduled request.
return;
}
}
// Compare ignoring case to make this resilient to header formatting.
if (requestHeaders != null
&& requestHeaders.keySet().stream()
.anyMatch(s -> Ascii.equalsIgnoreCase(s, SELF_SCHEDULED_HEADER_KEY))) {
// This request originated from a Kafka Scaler. Skip scheduling additional tasks.
return;
}
if (config.cycleDuration().compareTo(Duration.ZERO) > 0) {
// Do integer division because we want to floor the result.
int followUpTasks = (int) (60 / config.cycleDuration().toSeconds());
if (followUpTasks > 0) {
int taskIntervalSeconds = 60 / followUpTasks;
for (int i = taskIntervalSeconds; i < 60; i += taskIntervalSeconds) {
scheduleTask(now.plus(Duration.ofSeconds(i)), scalerUrl);
}
}
}
}