protected void setup()

in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java [743:834]


    protected void setup(Context context) {
      final State gobblinJobState = HadoopUtils.getStateFromConf(context.getConfiguration());
      TaskAttemptID taskAttemptID = context.getTaskAttemptID();

      troubleshooter = AutomaticTroubleshooterFactory.createForJob(gobblinJobState.getProperties());
      troubleshooter.start();

      try (Closer closer = Closer.create()) {
        // Default for customizedProgressEnabled is false.
        this.customizedProgressEnabled = isCustomizedProgressReportEnabled(gobblinJobState.getProperties());
        this.isSpeculativeEnabled = isSpeculativeExecutionEnabled(gobblinJobState.getProperties());

        String factoryClassName = gobblinJobState.getProperties().getProperty(
            CUSTOMIZED_PROGRESSER_FACTORY_CLASS, DEFAULT_CUSTOMIZED_PROGRESSER_FACTORY_CLASS);
        this.customizedProgresser = Class.forName(factoryClassName).asSubclass(CustomizedProgresser.Factory.class)
            .newInstance().createCustomizedProgresser(context);

        this.fs = FileSystem.get(context.getConfiguration());
        this.taskStateStore =
            new FsStateStore<>(this.fs, FileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class);
        String jobStateFileName = context.getConfiguration().get(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME);
        Optional<URI> jobStateFileUri = getStateFileUriForJob(context.getConfiguration(), jobStateFileName);
        if (jobStateFileUri.isPresent()) {
          SerializationUtils.deserializeStateFromInputStream(
                closer.register(new FileInputStream(jobStateFileUri.get().getPath())), this.jobState);
        } else {
          throw new IOException("Job state file not found: '" + jobStateFileName + "'.");
        }
      } catch (IOException | ReflectiveOperationException e) {
        throw new RuntimeException("Failed to setup the mapper task", e);
      }

      // load dynamic configuration to add to the job configuration
      Configuration configuration = context.getConfiguration();
      Config jobStateAsConfig = ConfigUtils.propertiesToConfig(this.jobState.getProperties());
      DynamicConfigGenerator dynamicConfigGenerator = DynamicConfigGeneratorFactory.createDynamicConfigGenerator(
          jobStateAsConfig);
      Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(jobStateAsConfig);

      // add the dynamic config to the job config
      for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
        this.jobState.setProp(entry.getKey(), entry.getValue().unwrapped().toString());
        configuration.set(entry.getKey(), entry.getValue().unwrapped().toString());
        gobblinJobState.setProp(entry.getKey(), entry.getValue().unwrapped().toString());
      }

      // add some more MR task related configs

      String[] tokens = taskAttemptID.toString().split("_");
      TaskType taskType = taskAttemptID.getTaskType();
      gobblinJobState.setProp(MR_TYPE_KEY, taskType.name());

      // a task attempt id should be like 'attempt_1592863931636_2371636_m_000003_4'
      if (tokens.length == 6) {
        if (taskType.equals(TaskType.MAP)) {
          gobblinJobState.setProp(MAPPER_TASK_NUM_KEY, tokens[tokens.length - 2]);
          gobblinJobState.setProp(MAPPER_TASK_ATTEMPT_NUM_KEY, tokens[tokens.length - 1]);
        } else if (taskType.equals(TaskType.REDUCE)) {
          gobblinJobState.setProp(REDUCER_TASK_NUM_KEY, tokens[tokens.length - 2]);
          gobblinJobState.setProp(REDUCER_TASK_ATTEMPT_NUM_KEY, tokens[tokens.length - 1]);
        }
      }

      this.taskExecutor = new TaskExecutor(configuration);
      this.taskStateTracker = new MRTaskStateTracker(context);
      this.serviceManager = new ServiceManager(Lists.newArrayList(this.taskExecutor, this.taskStateTracker));
      try {
        this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
      } catch (TimeoutException te) {
        LOG.error("Timed out while waiting for the service manager to start up", te);
        throw new RuntimeException(te);
      }

      // Setup and start metrics reporting if metric reporting is enabled
      if (Boolean.parseBoolean(configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
        this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
        try {
          this.jobMetrics.get().startMetricReportingWithFileSuffix(gobblinJobState, taskAttemptID.toString());
        } catch (MultiReporterException ex) {
          //Fail the task if metric/event reporting failure is configured to be fatal.
          boolean isMetricReportingFailureFatal = configuration.getBoolean(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
              ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
          boolean isEventReportingFailureFatal = configuration.getBoolean(ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
                  ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
          if (MetricReportUtils.shouldThrowException(LOG, ex, isMetricReportingFailureFatal, isEventReportingFailureFatal)) {
            throw new RuntimeException(ex);
          }
        }
      }

      AbstractJobLauncher.setDefaultAuthenticator(this.jobState.getProperties());
    }