in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java [419:520]
public JobExecutionDriver runAsync() throws TimeoutException, InterruptedException {
// Run function to distribute jars to workers in distributed mode
this.distributeJarsFunction.run();
log.debug("BuiltConfigMap: {}", this.builtConfigMap);
log.debug("DefaultSysConfig: {}", this.defaultSysConfig);
Config sysProps = ConfigFactory.parseMap(this.builtConfigMap)
.withFallback(this.defaultSysConfig);
log.debug("Merged SysProps:{}", sysProps);
Config userConfig = ConfigFactory.parseMap(this.userConfigMap);
log.debug("UserConfig: {}", userConfig);
JobSpec jobSpec;
if (this.jobFile.isPresent()) {
try {
Path jobFilePath = this.jobFile.get();
PullFileLoader loader =
new PullFileLoader(jobFilePath.getParent(), jobFilePath.getFileSystem(new Configuration()),
PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS,
PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
Config jobConfig = userConfig.withFallback(loader.loadPullFile(jobFilePath, sysProps, false));
log.debug("JobConfig: {}", jobConfig);
ImmutableFSJobCatalog.JobSpecConverter converter =
new ImmutableFSJobCatalog.JobSpecConverter(jobFilePath.getParent(), Optional.<String>absent());
jobSpec = converter.apply(jobConfig);
} catch (IOException ioe) {
throw new RuntimeException("Failed to run embedded Gobblin.", ioe);
}
} else {
Config finalConfig = userConfig.withFallback(sysProps);
if (this.template != null) {
this.specBuilder.withTemplate(this.template);
}
jobSpec = this.specBuilder.withConfig(finalConfig).build();
}
ResolvedJobSpec resolvedJobSpec;
try {
JobSpecResolver resolver = JobSpecResolver.builder(sysProps).build();
resolvedJobSpec = resolver.resolveJobSpec(jobSpec);
} catch (SpecNotFoundException | JobTemplate.TemplateException | IOException exc) {
throw new RuntimeException("Failed to resolved template.", exc);
}
final JobCatalog jobCatalog = new StaticJobCatalog(Optional.of(this.useLog), Lists.<JobSpec>newArrayList(resolvedJobSpec));
SimpleGobblinInstanceEnvironment instanceEnvironment =
new SimpleGobblinInstanceEnvironment("EmbeddedGobblinInstance", this.useLog, getSysConfig());
StandardGobblinInstanceDriver.Builder builder =
new StandardGobblinInstanceDriver.Builder(Optional.<GobblinInstanceEnvironment>of(instanceEnvironment)).withLog(this.useLog)
.withJobCatalog(jobCatalog)
.withImmediateJobScheduler();
for (GobblinInstancePluginFactory plugin : this.plugins) {
builder.addPlugin(plugin);
}
final GobblinInstanceDriver driver = builder.build();
EmbeddedJobLifecycleListener listener = new EmbeddedJobLifecycleListener(this.useLog);
driver.registerJobLifecycleListener(listener);
driver.startAsync();
boolean started = listener.awaitStarted(this.launchTimeout.getTimeout(), this.launchTimeout.getTimeUnit());
if (!started) {
dumpJStackOnTimeout("Launch");
log.warn("Timeout waiting for job to start. Aborting.");
driver.stopAsync();
driver.awaitTerminated(this.shutdownTimeout.getTimeout(), this.shutdownTimeout.getTimeUnit());
throw new TimeoutException("Timeout waiting for job to start.");
}
final JobExecutionDriver jobDriver = listener.getJobDriver();
// Stop the Gobblin instance driver when the job finishes.
Futures.addCallback(jobDriver, new FutureCallback<JobExecutionResult>() {
@Override
public void onSuccess(@Nullable JobExecutionResult result) {
stopGobblinInstanceDriver();
}
@Override
public void onFailure(Throwable t) {
stopGobblinInstanceDriver();
}
private void stopGobblinInstanceDriver() {
try {
driver.stopAsync();
driver.awaitTerminated(EmbeddedGobblin.this.shutdownTimeout.getTimeout(), EmbeddedGobblin.this.shutdownTimeout
.getTimeUnit());
} catch (TimeoutException te) {
dumpJStackOnTimeout("stop gobblin instance driver");
log.error("Failed to shutdown Gobblin instance driver.");
}
}
}, MoreExecutors.directExecutor());
return listener.getJobDriver();
}