public void scheduleTasks()

in src/main/java/com/google/cloud/run/kafkascaler/SelfScheduler.java [76:113]


  public void scheduleTasks(Instant now, Map<String, String> requestHeaders) {
    if (config.cycleDuration().compareTo(ConfigurationProvider.MAX_CYCLE_DURATION) >= 0) {
      logger.atInfo().log(
          "Cycle duration is greater than or equal to max cycle duration; skipping additional task"
              + " scheduling.");
      return;
    }

    if (scalerUrl == null) {
      try {
        scalerUrl = configProvider.scalerUrl(cloudRunMetadataClient.projectNumberRegion());
      } catch (RuntimeException | IOException e) {
        logger.atSevere().withCause(e).log("[SCHEDULER] Failed to schedule additional tasks");
        // It's safe to continue here, we just can't schedule additional tasks. We'll try again on
        // the next non-self-scheduled request.
        return;
      }
    }

    // Compare ignoring case to make this resilient to header formatting.
    if (requestHeaders != null
        && requestHeaders.keySet().stream()
            .anyMatch(s -> Ascii.equalsIgnoreCase(s, SELF_SCHEDULED_HEADER_KEY))) {
      // This request originated from a Kafka Scaler. Skip scheduling additional tasks.
      return;
    }

    if (config.cycleDuration().compareTo(Duration.ZERO) > 0) {
      // Do integer division because we want to floor the result.
      int followUpTasks = (int) (60 / config.cycleDuration().toSeconds());
      if (followUpTasks > 0) {
        int taskIntervalSeconds = 60 / followUpTasks;
        for (int i = taskIntervalSeconds; i < 60; i += taskIntervalSeconds) {
          scheduleTask(now.plus(Duration.ofSeconds(i)), scalerUrl);
        }
      }
    }
  }