public AddSpecResponse onAddSpec()

in gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java [482:563]


  public AddSpecResponse onAddSpec(Spec addedSpec) {
    long startTime = System.nanoTime();

    _log.info("New Flow Spec detected: " + addedSpec);

    if (!(addedSpec instanceof FlowSpec)) {
      return null;
    }

    FlowSpec flowSpec = (FlowSpec) addedSpec;
    URI flowSpecUri = flowSpec.getUri();
    Properties jobConfig = createJobConfig(flowSpec);
    boolean isExplain = flowSpec.isExplain();
    String response = null;

    long compileStartTime = System.nanoTime();
    // always try to compile the flow to verify if it is compilable
    Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
    this.eachSpecCompilationValue = System.nanoTime() - compileStartTime;
    // If dag is null then a compilation error has occurred
    if (dag != null && !dag.isEmpty()) {
      response = dag.toString();
    }

    boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);

    if (isExplain || !compileSuccess || !this.isActive) {
      // todo: in case of a scheduled job, we should also check if the job schedule is a valid cron schedule
      //  so it can be scheduled
      _log.info("Ignoring the spec {}. isExplain: {}, compileSuccess: {}, master: {}",
          addedSpec, isExplain, compileSuccess, this.isActive);
      return new AddSpecResponse<>(response);
    }

    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.MODIFICATION_TIME_KEY, "0"));
    String uriString = flowSpec.getUri().toString();
    boolean isRunImmediately = PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
      // For run-immediately flows with a schedule the modified_time would remain the same
      if (this.lastUpdatedTimeForFlowSpec.get(uriString).compareTo(modificationTime) > 0
          || (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {
        _log.warn("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
            addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
        this.eachCompleteAddSpecValue = System.nanoTime() - startTime;
        return new AddSpecResponse<>(response);
      }
    }

    // todo : we should probably not schedule a flow if it is a runOnce flow
    this.scheduledFlowSpecs.put(flowSpecUri.toString(), flowSpec);
    this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), modificationTime);

    if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
      _log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
      try {
        long scheduleStartTime = System.nanoTime();
        scheduleJob(jobConfig, null);
        this.eachScheduleJobValue = System.nanoTime() - scheduleStartTime;
      } catch (JobException je) {
        _log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
        this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
        this.lastUpdatedTimeForFlowSpec.remove(flowSpecUri.toString());
        this.eachCompleteAddSpecValue = System.nanoTime() - startTime;
        return null;
      }
      if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
        _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec);
        this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null));
      }
    } else {
      _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
      this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
    }

    this.eachCompleteAddSpecValue = System.nanoTime() - startTime;
    return new AddSpecResponse<>(response);
  }