in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java [743:834]
protected void setup(Context context) {
final State gobblinJobState = HadoopUtils.getStateFromConf(context.getConfiguration());
TaskAttemptID taskAttemptID = context.getTaskAttemptID();
troubleshooter = AutomaticTroubleshooterFactory.createForJob(gobblinJobState.getProperties());
troubleshooter.start();
try (Closer closer = Closer.create()) {
// Default for customizedProgressEnabled is false.
this.customizedProgressEnabled = isCustomizedProgressReportEnabled(gobblinJobState.getProperties());
this.isSpeculativeEnabled = isSpeculativeExecutionEnabled(gobblinJobState.getProperties());
String factoryClassName = gobblinJobState.getProperties().getProperty(
CUSTOMIZED_PROGRESSER_FACTORY_CLASS, DEFAULT_CUSTOMIZED_PROGRESSER_FACTORY_CLASS);
this.customizedProgresser = Class.forName(factoryClassName).asSubclass(CustomizedProgresser.Factory.class)
.newInstance().createCustomizedProgresser(context);
this.fs = FileSystem.get(context.getConfiguration());
this.taskStateStore =
new FsStateStore<>(this.fs, FileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class);
String jobStateFileName = context.getConfiguration().get(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME);
Optional<URI> jobStateFileUri = getStateFileUriForJob(context.getConfiguration(), jobStateFileName);
if (jobStateFileUri.isPresent()) {
SerializationUtils.deserializeStateFromInputStream(
closer.register(new FileInputStream(jobStateFileUri.get().getPath())), this.jobState);
} else {
throw new IOException("Job state file not found: '" + jobStateFileName + "'.");
}
} catch (IOException | ReflectiveOperationException e) {
throw new RuntimeException("Failed to setup the mapper task", e);
}
// load dynamic configuration to add to the job configuration
Configuration configuration = context.getConfiguration();
Config jobStateAsConfig = ConfigUtils.propertiesToConfig(this.jobState.getProperties());
DynamicConfigGenerator dynamicConfigGenerator = DynamicConfigGeneratorFactory.createDynamicConfigGenerator(
jobStateAsConfig);
Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(jobStateAsConfig);
// add the dynamic config to the job config
for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
this.jobState.setProp(entry.getKey(), entry.getValue().unwrapped().toString());
configuration.set(entry.getKey(), entry.getValue().unwrapped().toString());
gobblinJobState.setProp(entry.getKey(), entry.getValue().unwrapped().toString());
}
// add some more MR task related configs
String[] tokens = taskAttemptID.toString().split("_");
TaskType taskType = taskAttemptID.getTaskType();
gobblinJobState.setProp(MR_TYPE_KEY, taskType.name());
// a task attempt id should be like 'attempt_1592863931636_2371636_m_000003_4'
if (tokens.length == 6) {
if (taskType.equals(TaskType.MAP)) {
gobblinJobState.setProp(MAPPER_TASK_NUM_KEY, tokens[tokens.length - 2]);
gobblinJobState.setProp(MAPPER_TASK_ATTEMPT_NUM_KEY, tokens[tokens.length - 1]);
} else if (taskType.equals(TaskType.REDUCE)) {
gobblinJobState.setProp(REDUCER_TASK_NUM_KEY, tokens[tokens.length - 2]);
gobblinJobState.setProp(REDUCER_TASK_ATTEMPT_NUM_KEY, tokens[tokens.length - 1]);
}
}
this.taskExecutor = new TaskExecutor(configuration);
this.taskStateTracker = new MRTaskStateTracker(context);
this.serviceManager = new ServiceManager(Lists.newArrayList(this.taskExecutor, this.taskStateTracker));
try {
this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
} catch (TimeoutException te) {
LOG.error("Timed out while waiting for the service manager to start up", te);
throw new RuntimeException(te);
}
// Setup and start metrics reporting if metric reporting is enabled
if (Boolean.parseBoolean(configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
try {
this.jobMetrics.get().startMetricReportingWithFileSuffix(gobblinJobState, taskAttemptID.toString());
} catch (MultiReporterException ex) {
//Fail the task if metric/event reporting failure is configured to be fatal.
boolean isMetricReportingFailureFatal = configuration.getBoolean(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
boolean isEventReportingFailureFatal = configuration.getBoolean(ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
if (MetricReportUtils.shouldThrowException(LOG, ex, isMetricReportingFailureFatal, isEventReportingFailureFatal)) {
throw new RuntimeException(ex);
}
}
}
AbstractJobLauncher.setDefaultAuthenticator(this.jobState.getProperties());
}