in gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java [131:242]
public AzkabanJobLauncher(String jobId, Properties props)
throws Exception {
super(jobId, LOG);
HadoopUtils.addGobblinSite();
// Configure root metric context
List<Tag<?>> tags = Lists.newArrayList();
tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
RootMetricContext.get(tags);
if (props.containsKey(GOBBLIN_LOG_LEVEL_KEY)) {
Level logLevel = Level.toLevel(props.getProperty(GOBBLIN_LOG_LEVEL_KEY), Level.INFO);
Logger.getLogger("org.apache.gobblin").setLevel(logLevel);
}
Log4jConfigurationHelper.setLogLevel(PropertiesUtils.getPropAsList(props, Log4jConfigurationHelper.LOG_LEVEL_OVERRIDE_MAP, ""));
this.props = new Properties();
this.props.putAll(props);
// initialize job listeners after properties has been initialized
this.jobListener = initJobListener();
// load dynamic configuration and add them to the job properties
Config propsAsConfig = ConfigUtils.propertiesToConfig(props);
DynamicConfigGenerator dynamicConfigGenerator =
DynamicConfigGeneratorFactory.createDynamicConfigGenerator(propsAsConfig);
Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(propsAsConfig);
// add the dynamic config to the job config
for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
this.props.put(entry.getKey(), entry.getValue().unwrapped().toString());
}
Configuration conf = new Configuration();
String fsUri = conf.get(HADOOP_FS_DEFAULT_NAME);
if (!Strings.isNullOrEmpty(fsUri)) {
if (!this.props.containsKey(ConfigurationKeys.FS_URI_KEY)) {
this.props.setProperty(ConfigurationKeys.FS_URI_KEY, fsUri);
}
if (!this.props.containsKey(ConfigurationKeys.STATE_STORE_FS_URI_KEY)) {
this.props.setProperty(ConfigurationKeys.STATE_STORE_FS_URI_KEY, fsUri);
}
}
// Set the job tracking URL to point to the Azkaban job execution link URL
this.props
.setProperty(ConfigurationKeys.JOB_TRACKING_URL_KEY, Strings.nullToEmpty(conf.get(AZKABAN_LINK_JOBEXEC_URL)));
if (Boolean.parseBoolean(this.props.getProperty(GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS,
DEFAULT_GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS))) {
if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
LOG.info("Job type " + props.getProperty(JOB_TYPE) + " provided Hadoop token in the environment variable " + HADOOP_TOKEN_FILE_LOCATION);
this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, System.getenv(HADOOP_TOKEN_FILE_LOCATION));
} else {
// see javadoc for more information
LOG.info(
"Job type " + props.getProperty(JOB_TYPE) + " did not provide Hadoop token in the environment variable " + HADOOP_TOKEN_FILE_LOCATION + ". Negotiating Hadoop tokens.");
File tokenFile = Files.createTempFile("mr-azkaban", ".token").toFile();
TokenUtils.getHadoopTokens(new State(props), Optional.of(tokenFile), new Credentials());
System.setProperty(HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath());
System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath());
this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath());
this.props.setProperty("env." + HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath());
}
}
Properties jobProps = this.props;
resolveGobblinJobTemplateIfNecessary(jobProps);
GobblinMetrics.addCustomTagsToProperties(jobProps, tags);
// If the job launcher type is not specified in the job configuration,
// override the default to use the MAPREDUCE launcher.
if (!jobProps.containsKey(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY)) {
jobProps.setProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY,
JobLauncherFactory.JobLauncherType.MAPREDUCE.toString());
}
this.ownAzkabanSla = Long.parseLong(
jobProps.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS));
List<? extends Tag<?>> metadataTags = Lists.newArrayList();
//Is the job triggered using Gobblin-as-a-Service? If so, add additional tags needed for tracking
//the job execution.
if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
metadataTags = addAdditionalMetadataTags(jobProps);
}
// Create a JobLauncher instance depending on the configuration. The same properties object is
// used for both system and job configuration properties because Azkaban puts configuration
// properties in the .job file and in the .properties file into the same Properties object.
this.jobLauncher = this.closer.register(JobLauncherFactory.newJobLauncher(jobProps, jobProps, null, metadataTags));
// Since Java classes cannot extend multiple classes and Azkaban jobs must extend AbstractJob, we must use composition
// verses extending ServiceBasedAppLauncher
boolean isMetricReportingFailureFatal = PropertiesUtils
.getPropAsBoolean(jobProps, ConfigurationKeys.GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL,
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL));
boolean isEventReportingFailureFatal = PropertiesUtils
.getPropAsBoolean(jobProps, ConfigurationKeys.GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL,
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL));
jobProps.setProperty(MetricsReportingService.METRICS_REPORTING_FAILURE_FATAL_KEY, Boolean.toString(isMetricReportingFailureFatal));
jobProps.setProperty(MetricsReportingService.EVENT_REPORTING_FAILURE_FATAL_KEY, Boolean.toString(isEventReportingFailureFatal));
this.applicationLauncher =
this.closer.register(new ServiceBasedAppLauncher(jobProps, "Azkaban-" + UUID.randomUUID()));
}