in gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java [301:392]
private void scheduleSpecsFromCatalog() {
int numSpecs = this.flowCatalog.getSize();
int actualNumFlowsScheduled = 0;
_log.info("Scheduling specs from catalog: {} flows in the catalog, will skip scheduling flows with next run after "
+ "{} days", numSpecs, this.skipSchedulingFlowsAfterNumDays);
long startTime = System.nanoTime();
long totalGetTime = 0;
long totalAddSpecTime = 0;
Iterator<URI> uriIterator;
HashSet<URI> urisLeftToSchedule = new HashSet<>();
try {
uriIterator = this.flowCatalog.getSpecURIs();
while (uriIterator.hasNext()) {
urisLeftToSchedule.add(uriIterator.next());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
this.timeToObtainSpecUrisValue = System.nanoTime() - startTime;
try {
// If current instances nominated as DR handler, will take additional URIS from FlowCatalog.
if (isNominatedDRHandler) {
// Synchronously cleaning the execution state for DR-applicable FlowSpecs
// before rescheduling the again in nominated DR-Hanlder.
Iterator<URI> drUris = this.flowCatalog.getSpecURISWithTag(DR_FILTER_TAG);
clearRunningFlowState(drUris);
}
} catch (IOException e) {
throw new RuntimeException("Failed to get Spec URIs with tag to clear running flow state", e);
}
int startOffset = 0;
long batchGetStartTime;
long batchGetEndTime;
while (startOffset < numSpecs) {
batchGetStartTime = System.nanoTime();
Collection<Spec> batchOfSpecs = this.flowCatalog.getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
batchGetEndTime = System.nanoTime();
while (batchOfSpecsIterator.hasNext()) {
Spec spec = batchOfSpecsIterator.next();
try {
if (addSpecHelperMethod(spec)) {
totalAddSpecTime += this.eachCompleteAddSpecValue; // this is updated by each call to onAddSpec
actualNumFlowsScheduled += 1;
}
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it and continue adding flows
_log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
}
urisLeftToSchedule.remove(spec.getUri());
}
startOffset += this.loadSpecsBatchSize;
totalGetTime += batchGetEndTime - batchGetStartTime;
// Don't skew the average get spec time value with the last batch that may be very small
if (startOffset == 0 || batchOfSpecs.size() >= Math.round(0.75 * this.loadSpecsBatchSize)) {
perSpecGetRateValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
}
}
// Ensure we did not miss any specs due to ordering changing (deletions/insertions) while loading
Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
long individualGetSpecStartTime;
while (urisLeft.hasNext()) {
URI uri = urisLeft.next();
try {
individualGetSpecStartTime = System.nanoTime();
Spec spec = this.flowCatalog.getSpecWrapper(uri);
this.individualGetSpecSpeedValue = System.nanoTime() - individualGetSpecStartTime;
totalGetTime += this.individualGetSpecSpeedValue;
if (addSpecHelperMethod(spec)) {
totalAddSpecTime += this.eachCompleteAddSpecValue; // this is updated by each call to onAddSpec
actualNumFlowsScheduled += 1;
}
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it and continue adding flows
_log.error("Could not schedule spec uri {} from flowCatalog due to {}", uri, e);
}
}
// Reset value after its last value to get an accurate reading
this.perSpecGetRateValue = -1L;
this.individualGetSpecSpeedValue = -1L;
this.totalGetSpecTimeValue = totalGetTime;
this.totalAddSpecTimeValue = totalAddSpecTime;
this.numJobsScheduledDuringStartupValue = actualNumFlowsScheduled;
this.flowCatalog.getMetrics().updateGetSpecTime(startTime);
this.timeToInitializeSchedulerValue = System.nanoTime() - startTime;
}