public void run()

in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java [837:932]


    public void run(Context context) throws IOException, InterruptedException {
      this.setup(context);

      Path interruptPath = new Path(context.getConfiguration().get(GOBBLIN_JOB_INTERRUPT_PATH_KEY));
      if (this.fs.exists(interruptPath)) {
        LOG.info(String.format("Found interrupt path %s indicating the driver has interrupted the job, aborting mapper.", interruptPath));
        return;
      }

      GobblinMultiTaskAttempt gobblinMultiTaskAttempt = null;
      try {
        // De-serialize and collect the list of WorkUnits to run
        while (context.nextKeyValue()) {
          this.map(context.getCurrentKey(), context.getCurrentValue(), context);
        }

        // org.apache.hadoop.util.Progress.complete will set the progress to 1.0f eventually so we don't have to
        // set it in finally block.
        if (customizedProgressEnabled) {
          setProgressInMapper(customizedProgresser.getCustomizedProgress(), context);
        }

        GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy =
            isSpeculativeEnabled ? GobblinMultiTaskAttempt.CommitPolicy.CUSTOMIZED
                : GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE;

        SharedResourcesBroker<GobblinScopeTypes> globalBroker =
            SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
                ConfigFactory.parseProperties(this.jobState.getProperties()),
                GobblinScopeTypes.GLOBAL.defaultScopeInstance());
        SharedResourcesBroker<GobblinScopeTypes> jobBroker =
            globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId()))
                .build();

        // Actually run the list of WorkUnits
        gobblinMultiTaskAttempt =
            GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(),
                this.jobState, this.workUnits, this.taskStateTracker, this.taskExecutor, this.taskStateStore,
                multiTaskAttemptCommitPolicy, jobBroker, troubleshooter.getIssueRepository(), (gmta) -> {
                  try {
                    return this.fs.exists(interruptPath);
                  } catch (IOException ioe) {
                    return false;
                  }
                });

        if (this.isSpeculativeEnabled) {
          LOG.info("will not commit in task attempt");
          GobblinOutputCommitter gobblinOutputCommitter = (GobblinOutputCommitter) context.getOutputCommitter();
          gobblinOutputCommitter.getAttemptIdToMultiTaskAttempt()
              .put(context.getTaskAttemptID().toString(), gobblinMultiTaskAttempt);
        }
      } finally {
        try {
          troubleshooter.refineIssues();
          troubleshooter.logIssueSummary();
          troubleshooter.stop();
        } catch (Exception e) {
          LOG.error("Failed to report issues from automatic troubleshooter", e);
        }

        CommitStep cleanUpCommitStep = new CommitStep() {

          @Override
          public boolean isCompleted() throws IOException {
            return !serviceManager.isHealthy();
          }

          @Override
          public void execute() throws IOException {
            LOG.info("Starting the clean-up steps.");
            try {
              serviceManager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
            } catch (TimeoutException te) {
              // Ignored
            } finally {
              if (jobMetrics.isPresent()) {
                try {
                  jobMetrics.get().stopMetricsReporting();
                } catch (Throwable throwable) {
                  LOG.error("Failed to stop job metrics reporting.", throwable);
                } finally {
                  GobblinMetrics.remove(jobMetrics.get().getName());
                }
              }
            }
          }
        };
        if (!this.isSpeculativeEnabled || gobblinMultiTaskAttempt == null) {
          cleanUpCommitStep.execute();
        } else {
          LOG.info("Adding additional commit step");
          gobblinMultiTaskAttempt.addCleanupCommitStep(cleanUpCommitStep);
        }
      }
    }