in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java [837:932]
public void run(Context context) throws IOException, InterruptedException {
this.setup(context);
Path interruptPath = new Path(context.getConfiguration().get(GOBBLIN_JOB_INTERRUPT_PATH_KEY));
if (this.fs.exists(interruptPath)) {
LOG.info(String.format("Found interrupt path %s indicating the driver has interrupted the job, aborting mapper.", interruptPath));
return;
}
GobblinMultiTaskAttempt gobblinMultiTaskAttempt = null;
try {
// De-serialize and collect the list of WorkUnits to run
while (context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
// org.apache.hadoop.util.Progress.complete will set the progress to 1.0f eventually so we don't have to
// set it in finally block.
if (customizedProgressEnabled) {
setProgressInMapper(customizedProgresser.getCustomizedProgress(), context);
}
GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy =
isSpeculativeEnabled ? GobblinMultiTaskAttempt.CommitPolicy.CUSTOMIZED
: GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE;
SharedResourcesBroker<GobblinScopeTypes> globalBroker =
SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
ConfigFactory.parseProperties(this.jobState.getProperties()),
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
SharedResourcesBroker<GobblinScopeTypes> jobBroker =
globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId()))
.build();
// Actually run the list of WorkUnits
gobblinMultiTaskAttempt =
GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(),
this.jobState, this.workUnits, this.taskStateTracker, this.taskExecutor, this.taskStateStore,
multiTaskAttemptCommitPolicy, jobBroker, troubleshooter.getIssueRepository(), (gmta) -> {
try {
return this.fs.exists(interruptPath);
} catch (IOException ioe) {
return false;
}
});
if (this.isSpeculativeEnabled) {
LOG.info("will not commit in task attempt");
GobblinOutputCommitter gobblinOutputCommitter = (GobblinOutputCommitter) context.getOutputCommitter();
gobblinOutputCommitter.getAttemptIdToMultiTaskAttempt()
.put(context.getTaskAttemptID().toString(), gobblinMultiTaskAttempt);
}
} finally {
try {
troubleshooter.refineIssues();
troubleshooter.logIssueSummary();
troubleshooter.stop();
} catch (Exception e) {
LOG.error("Failed to report issues from automatic troubleshooter", e);
}
CommitStep cleanUpCommitStep = new CommitStep() {
@Override
public boolean isCompleted() throws IOException {
return !serviceManager.isHealthy();
}
@Override
public void execute() throws IOException {
LOG.info("Starting the clean-up steps.");
try {
serviceManager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
} catch (TimeoutException te) {
// Ignored
} finally {
if (jobMetrics.isPresent()) {
try {
jobMetrics.get().stopMetricsReporting();
} catch (Throwable throwable) {
LOG.error("Failed to stop job metrics reporting.", throwable);
} finally {
GobblinMetrics.remove(jobMetrics.get().getName());
}
}
}
}
};
if (!this.isSpeculativeEnabled || gobblinMultiTaskAttempt == null) {
cleanUpCommitStep.execute();
} else {
LOG.info("Adding additional commit step");
gobblinMultiTaskAttempt.addCleanupCommitStep(cleanUpCommitStep);
}
}
}