public JobExecutionDriver runAsync()

in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java [419:520]


  public JobExecutionDriver runAsync() throws TimeoutException, InterruptedException {
    // Run function to distribute jars to workers in distributed mode
    this.distributeJarsFunction.run();
    log.debug("BuiltConfigMap: {}", this.builtConfigMap);
    log.debug("DefaultSysConfig: {}", this.defaultSysConfig);

    Config sysProps = ConfigFactory.parseMap(this.builtConfigMap)
        .withFallback(this.defaultSysConfig);
    log.debug("Merged SysProps:{}", sysProps);
    Config userConfig = ConfigFactory.parseMap(this.userConfigMap);
    log.debug("UserConfig: {}", userConfig);

    JobSpec jobSpec;
    if (this.jobFile.isPresent()) {
      try {
        Path jobFilePath = this.jobFile.get();
        PullFileLoader loader =
            new PullFileLoader(jobFilePath.getParent(), jobFilePath.getFileSystem(new Configuration()),
                PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS,
                PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
        Config jobConfig = userConfig.withFallback(loader.loadPullFile(jobFilePath, sysProps, false));
        log.debug("JobConfig: {}", jobConfig);

        ImmutableFSJobCatalog.JobSpecConverter converter =
            new ImmutableFSJobCatalog.JobSpecConverter(jobFilePath.getParent(), Optional.<String>absent());
        jobSpec = converter.apply(jobConfig);
      } catch (IOException ioe) {
        throw new RuntimeException("Failed to run embedded Gobblin.", ioe);
      }
    } else {
      Config finalConfig = userConfig.withFallback(sysProps);
      if (this.template != null) {
        this.specBuilder.withTemplate(this.template);
      }
      jobSpec = this.specBuilder.withConfig(finalConfig).build();
    }

    ResolvedJobSpec resolvedJobSpec;
    try {
      JobSpecResolver resolver = JobSpecResolver.builder(sysProps).build();
      resolvedJobSpec = resolver.resolveJobSpec(jobSpec);
    } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException exc) {
      throw new RuntimeException("Failed to resolved template.", exc);
    }
    final JobCatalog jobCatalog = new StaticJobCatalog(Optional.of(this.useLog), Lists.<JobSpec>newArrayList(resolvedJobSpec));

    SimpleGobblinInstanceEnvironment instanceEnvironment =
        new SimpleGobblinInstanceEnvironment("EmbeddedGobblinInstance", this.useLog, getSysConfig());

    StandardGobblinInstanceDriver.Builder builder =
        new StandardGobblinInstanceDriver.Builder(Optional.<GobblinInstanceEnvironment>of(instanceEnvironment)).withLog(this.useLog)
        .withJobCatalog(jobCatalog)
        .withImmediateJobScheduler();

    for (GobblinInstancePluginFactory plugin : this.plugins) {
      builder.addPlugin(plugin);
    }

    final GobblinInstanceDriver driver = builder.build();

    EmbeddedJobLifecycleListener listener = new EmbeddedJobLifecycleListener(this.useLog);
    driver.registerJobLifecycleListener(listener);

    driver.startAsync();

    boolean started = listener.awaitStarted(this.launchTimeout.getTimeout(), this.launchTimeout.getTimeUnit());
    if (!started) {
      dumpJStackOnTimeout("Launch");
      log.warn("Timeout waiting for job to start. Aborting.");
      driver.stopAsync();
      driver.awaitTerminated(this.shutdownTimeout.getTimeout(), this.shutdownTimeout.getTimeUnit());
      throw new TimeoutException("Timeout waiting for job to start.");
    }

    final JobExecutionDriver jobDriver = listener.getJobDriver();
    // Stop the Gobblin instance driver when the job finishes.
    Futures.addCallback(jobDriver, new FutureCallback<JobExecutionResult>() {
      @Override
      public void onSuccess(@Nullable JobExecutionResult result) {

        stopGobblinInstanceDriver();
      }

      @Override
      public void onFailure(Throwable t) {
        stopGobblinInstanceDriver();
      }

      private void stopGobblinInstanceDriver() {
        try {
          driver.stopAsync();
          driver.awaitTerminated(EmbeddedGobblin.this.shutdownTimeout.getTimeout(), EmbeddedGobblin.this.shutdownTimeout
              .getTimeUnit());
        } catch (TimeoutException te) {
          dumpJStackOnTimeout("stop gobblin instance driver");
          log.error("Failed to shutdown Gobblin instance driver.");
        }
      }
    }, MoreExecutors.directExecutor());

    return listener.getJobDriver();
  }