public AzkabanJobLauncher()

in gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java [131:242]


  public AzkabanJobLauncher(String jobId, Properties props)
      throws Exception {
      super(jobId, LOG);

    HadoopUtils.addGobblinSite();

    // Configure root metric context
    List<Tag<?>> tags = Lists.newArrayList();
    tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
    RootMetricContext.get(tags);

    if (props.containsKey(GOBBLIN_LOG_LEVEL_KEY)) {
      Level logLevel = Level.toLevel(props.getProperty(GOBBLIN_LOG_LEVEL_KEY), Level.INFO);
      Logger.getLogger("org.apache.gobblin").setLevel(logLevel);
    }

    Log4jConfigurationHelper.setLogLevel(PropertiesUtils.getPropAsList(props, Log4jConfigurationHelper.LOG_LEVEL_OVERRIDE_MAP, ""));

    this.props = new Properties();
    this.props.putAll(props);

    // initialize job listeners after properties has been initialized
    this.jobListener = initJobListener();

    // load dynamic configuration and add them to the job properties
    Config propsAsConfig = ConfigUtils.propertiesToConfig(props);
    DynamicConfigGenerator dynamicConfigGenerator =
        DynamicConfigGeneratorFactory.createDynamicConfigGenerator(propsAsConfig);
    Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(propsAsConfig);

    // add the dynamic config to the job config
    for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
      this.props.put(entry.getKey(), entry.getValue().unwrapped().toString());
    }

    Configuration conf = new Configuration();

    String fsUri = conf.get(HADOOP_FS_DEFAULT_NAME);
    if (!Strings.isNullOrEmpty(fsUri)) {
      if (!this.props.containsKey(ConfigurationKeys.FS_URI_KEY)) {
        this.props.setProperty(ConfigurationKeys.FS_URI_KEY, fsUri);
      }
      if (!this.props.containsKey(ConfigurationKeys.STATE_STORE_FS_URI_KEY)) {
        this.props.setProperty(ConfigurationKeys.STATE_STORE_FS_URI_KEY, fsUri);
      }
    }

    // Set the job tracking URL to point to the Azkaban job execution link URL
    this.props
        .setProperty(ConfigurationKeys.JOB_TRACKING_URL_KEY, Strings.nullToEmpty(conf.get(AZKABAN_LINK_JOBEXEC_URL)));

    if (Boolean.parseBoolean(this.props.getProperty(GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS,
        DEFAULT_GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS))) {
      if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
        LOG.info("Job type " + props.getProperty(JOB_TYPE) + " provided Hadoop token in the environment variable " + HADOOP_TOKEN_FILE_LOCATION);
        this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, System.getenv(HADOOP_TOKEN_FILE_LOCATION));
      } else {
        // see javadoc for more information
        LOG.info(
            "Job type " + props.getProperty(JOB_TYPE) + " did not provide Hadoop token in the environment variable " + HADOOP_TOKEN_FILE_LOCATION + ". Negotiating Hadoop tokens.");

        File tokenFile = Files.createTempFile("mr-azkaban", ".token").toFile();
        TokenUtils.getHadoopTokens(new State(props), Optional.of(tokenFile), new Credentials());

        System.setProperty(HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath());
        System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath());
        this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath());
        this.props.setProperty("env." + HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath());
      }
    }

    Properties jobProps = this.props;
    resolveGobblinJobTemplateIfNecessary(jobProps);
    GobblinMetrics.addCustomTagsToProperties(jobProps, tags);

    // If the job launcher type is not specified in the job configuration,
    // override the default to use the MAPREDUCE launcher.
    if (!jobProps.containsKey(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY)) {
      jobProps.setProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY,
          JobLauncherFactory.JobLauncherType.MAPREDUCE.toString());
    }

    this.ownAzkabanSla = Long.parseLong(
        jobProps.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS));

    List<? extends Tag<?>> metadataTags = Lists.newArrayList();
    //Is the job triggered using Gobblin-as-a-Service? If so, add additional tags needed for tracking
    //the job execution.
    if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
      metadataTags = addAdditionalMetadataTags(jobProps);
    }

    // Create a JobLauncher instance depending on the configuration. The same properties object is
    // used for both system and job configuration properties because Azkaban puts configuration
    // properties in the .job file and in the .properties file into the same Properties object.
    this.jobLauncher = this.closer.register(JobLauncherFactory.newJobLauncher(jobProps, jobProps, null, metadataTags));

    // Since Java classes cannot extend multiple classes and Azkaban jobs must extend AbstractJob, we must use composition
    // verses extending ServiceBasedAppLauncher
    boolean isMetricReportingFailureFatal = PropertiesUtils
        .getPropAsBoolean(jobProps, ConfigurationKeys.GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL,
            Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL));
    boolean isEventReportingFailureFatal = PropertiesUtils
        .getPropAsBoolean(jobProps, ConfigurationKeys.GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL,
            Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL));

    jobProps.setProperty(MetricsReportingService.METRICS_REPORTING_FAILURE_FATAL_KEY, Boolean.toString(isMetricReportingFailureFatal));
    jobProps.setProperty(MetricsReportingService.EVENT_REPORTING_FAILURE_FATAL_KEY, Boolean.toString(isEventReportingFailureFatal));

    this.applicationLauncher =
        this.closer.register(new ServiceBasedAppLauncher(jobProps, "Azkaban-" + UUID.randomUUID()));
  }