in gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java [482:563]
public AddSpecResponse onAddSpec(Spec addedSpec) {
long startTime = System.nanoTime();
_log.info("New Flow Spec detected: " + addedSpec);
if (!(addedSpec instanceof FlowSpec)) {
return null;
}
FlowSpec flowSpec = (FlowSpec) addedSpec;
URI flowSpecUri = flowSpec.getUri();
Properties jobConfig = createJobConfig(flowSpec);
boolean isExplain = flowSpec.isExplain();
String response = null;
long compileStartTime = System.nanoTime();
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
this.eachSpecCompilationValue = System.nanoTime() - compileStartTime;
// If dag is null then a compilation error has occurred
if (dag != null && !dag.isEmpty()) {
response = dag.toString();
}
boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
if (isExplain || !compileSuccess || !this.isActive) {
// todo: in case of a scheduled job, we should also check if the job schedule is a valid cron schedule
// so it can be scheduled
_log.info("Ignoring the spec {}. isExplain: {}, compileSuccess: {}, master: {}",
addedSpec, isExplain, compileSuccess, this.isActive);
return new AddSpecResponse<>(response);
}
// Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
// don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.MODIFICATION_TIME_KEY, "0"));
String uriString = flowSpec.getUri().toString();
boolean isRunImmediately = PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
// If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
// enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
&& this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
// For run-immediately flows with a schedule the modified_time would remain the same
if (this.lastUpdatedTimeForFlowSpec.get(uriString).compareTo(modificationTime) > 0
|| (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {
_log.warn("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
this.eachCompleteAddSpecValue = System.nanoTime() - startTime;
return new AddSpecResponse<>(response);
}
}
// todo : we should probably not schedule a flow if it is a runOnce flow
this.scheduledFlowSpecs.put(flowSpecUri.toString(), flowSpec);
this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), modificationTime);
if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
_log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
try {
long scheduleStartTime = System.nanoTime();
scheduleJob(jobConfig, null);
this.eachScheduleJobValue = System.nanoTime() - scheduleStartTime;
} catch (JobException je) {
_log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
this.lastUpdatedTimeForFlowSpec.remove(flowSpecUri.toString());
this.eachCompleteAddSpecValue = System.nanoTime() - startTime;
return null;
}
if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
_log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec);
this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null));
}
} else {
_log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
}
this.eachCompleteAddSpecValue = System.nanoTime() - startTime;
return new AddSpecResponse<>(response);
}