private void scheduleSpecsFromCatalog()

in gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java [301:392]


  private void scheduleSpecsFromCatalog() {
    int numSpecs = this.flowCatalog.getSize();
    int actualNumFlowsScheduled = 0;
    _log.info("Scheduling specs from catalog: {} flows in the catalog, will skip scheduling flows with next run after "
        + "{} days", numSpecs, this.skipSchedulingFlowsAfterNumDays);
    long startTime = System.nanoTime();
    long totalGetTime = 0;
    long totalAddSpecTime = 0;
    Iterator<URI> uriIterator;
    HashSet<URI> urisLeftToSchedule = new HashSet<>();
    try {
      uriIterator = this.flowCatalog.getSpecURIs();
      while (uriIterator.hasNext()) {
        urisLeftToSchedule.add(uriIterator.next());
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    this.timeToObtainSpecUrisValue = System.nanoTime() - startTime;

    try {
      // If current instances nominated as DR handler, will take additional URIS from FlowCatalog.
      if (isNominatedDRHandler) {
        // Synchronously cleaning the execution state for DR-applicable FlowSpecs
        // before rescheduling the again in nominated DR-Hanlder.
        Iterator<URI> drUris = this.flowCatalog.getSpecURISWithTag(DR_FILTER_TAG);
        clearRunningFlowState(drUris);
      }
    } catch (IOException e) {
      throw new RuntimeException("Failed to get Spec URIs with tag to clear running flow state", e);
    }

    int startOffset = 0;
    long batchGetStartTime;
    long batchGetEndTime;

    while (startOffset < numSpecs) {
      batchGetStartTime  = System.nanoTime();
      Collection<Spec> batchOfSpecs = this.flowCatalog.getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
      batchGetEndTime = System.nanoTime();

      while (batchOfSpecsIterator.hasNext()) {
        Spec spec = batchOfSpecsIterator.next();
        try {
          if (addSpecHelperMethod(spec)) {
            totalAddSpecTime += this.eachCompleteAddSpecValue; // this is updated by each call to onAddSpec
            actualNumFlowsScheduled += 1;
          }
        } catch (Exception e) {
          // If there is an uncaught error thrown during compilation, log it and continue adding flows
          _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
        }
        urisLeftToSchedule.remove(spec.getUri());
      }
      startOffset += this.loadSpecsBatchSize;
      totalGetTime += batchGetEndTime - batchGetStartTime;
      // Don't skew the average get spec time value with the last batch that may be very small
      if (startOffset == 0 || batchOfSpecs.size() >=  Math.round(0.75 * this.loadSpecsBatchSize)) {
        perSpecGetRateValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
      }
    }

    // Ensure we did not miss any specs due to ordering changing (deletions/insertions) while loading
    Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
    long individualGetSpecStartTime;
    while (urisLeft.hasNext()) {
        URI uri = urisLeft.next();
        try {
          individualGetSpecStartTime = System.nanoTime();
          Spec spec = this.flowCatalog.getSpecWrapper(uri);
          this.individualGetSpecSpeedValue = System.nanoTime() - individualGetSpecStartTime;
          totalGetTime += this.individualGetSpecSpeedValue;
          if (addSpecHelperMethod(spec)) {
            totalAddSpecTime += this.eachCompleteAddSpecValue; // this is updated by each call to onAddSpec
            actualNumFlowsScheduled += 1;
          }
        } catch (Exception e) {
          // If there is an uncaught error thrown during compilation, log it and continue adding flows
          _log.error("Could not schedule spec uri {} from flowCatalog due to {}", uri, e);
        }
    }
    // Reset value after its last value to get an accurate reading
    this.perSpecGetRateValue = -1L;
    this.individualGetSpecSpeedValue = -1L;

    this.totalGetSpecTimeValue = totalGetTime;
    this.totalAddSpecTimeValue = totalAddSpecTime;
    this.numJobsScheduledDuringStartupValue = actualNumFlowsScheduled;
    this.flowCatalog.getMetrics().updateGetSpecTime(startTime);
    this.timeToInitializeSchedulerValue = System.nanoTime() - startTime;
  }