in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java [433:678]
public void launchJob(JobListener jobListener)
throws JobException {
String jobId = this.jobContext.getJobId();
final JobState jobState = this.jobContext.getJobState();
boolean isWorkUnitsEmpty = false;
try {
MDC.put(ConfigurationKeys.JOB_NAME_KEY, this.jobContext.getJobName());
MDC.put(ConfigurationKeys.JOB_KEY_KEY, this.jobContext.getJobKey());
TimingEvent launchJobTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.FULL_JOB_EXECUTION);
try (Closer closer = Closer.create()) {
closer.register(this.jobContext);
notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_PREPARE, new JobListenerAction() {
@Override
public void apply(JobListener jobListener, JobContext jobContext)
throws Exception {
jobListener.onJobPrepare(jobContext);
}
});
if (this.jobContext.getSemantics() == DeliverySemantics.EXACTLY_ONCE) {
// If exactly-once is used, commit sequences of the previous run must be successfully compelted
// before this run can make progress.
executeUnfinishedCommitSequences(jobState.getJobName());
}
Source<?, ?> source = this.jobContext.getSource();
if (source instanceof InfiniteSource) {
((InfiniteSource) source).getEventBus().register(this);
} else if (source instanceof SourceDecorator) {
if (((SourceDecorator<?, ?>) source).getEventBus() != null) {
((SourceDecorator<?, ?>) source).getEventBus().register(this);
}
}
TimingEvent workUnitsCreationTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
WorkUnitStream workUnitStream;
if (source instanceof WorkUnitStreamSource) {
workUnitStream = ((WorkUnitStreamSource) source).getWorkunitStream(jobState);
} else {
workUnitStream = new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
}
workUnitsCreationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.WORK_UNITS_CREATION));
this.gobblinJobMetricsReporter.reportWorkUnitCreationTimerMetrics(workUnitsCreationTimer, jobState);
// The absence means there is something wrong getting the work units
if (workUnitStream == null || workUnitStream.getWorkUnits() == null) {
this.eventSubmitter.submit(JobEvent.WORK_UNITS_MISSING);
jobState.setState(JobState.RunningState.FAILED);
String errMsg = "Failed to get work units for job " + jobId;
this.jobContext.getJobState().setJobFailureMessage(errMsg);
this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(0, jobState);
throw new JobException(errMsg);
}
// No work unit to run
if (!workUnitStream.getWorkUnits().hasNext()) {
this.eventSubmitter.submit(JobEvent.WORK_UNITS_EMPTY);
LOG.warn("No work units have been created for job " + jobId);
jobState.setState(JobState.RunningState.COMMITTED);
isWorkUnitsEmpty = true;
this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(0, jobState);
return;
}
// calculation of total bytes to copy in a job used to track a job's copy progress
if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, ConfigurationKeys.DEFAULT_REPORT_JOB_PROGRESS)) {
LOG.info("Report job progress config is turned on");
if (workUnitStream.isSafeToMaterialize()) {
long totalSizeInBytes = sumWorkUnitsSizes(workUnitStream);
this.jobContext.getJobState().setProp(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE, totalSizeInBytes);
} else {
LOG.warn("Property " + ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+ "progress cannot be reported for infinite work unit streams. Turn off property "
+ ConfigurationKeys.REPORT_JOB_PROGRESS);
}
}
// Perform work needed before writing is done
this.canCleanUpStagingData = this.canCleanStagingData(this.jobContext.getJobState());
this.destDatasetHandlerService = new DestinationDatasetHandlerService(jobState, canCleanUpStagingData, this.eventSubmitter);
closer.register(this.destDatasetHandlerService);
workUnitStream = this.destDatasetHandlerService.executeHandlers(workUnitStream);
//Initialize writer and converter(s)
closer.register(WriterInitializerFactory.newInstace(jobState, workUnitStream)).initialize();
closer.register(ConverterInitializerFactory.newInstance(jobState, workUnitStream)).initialize();
TimingEvent stagingDataCleanTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_STAGING_DATA_CLEAN);
// Cleanup left-over staging data possibly from the previous run. This is particularly
// important if the current batch of WorkUnits include failed WorkUnits from the previous
// run which may still have left-over staging data not cleaned up yet.
cleanLeftoverStagingData(workUnitStream, jobState);
stagingDataCleanTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.MR_STAGING_DATA_CLEAN));
long startTime = System.currentTimeMillis();
jobState.setStartTime(startTime);
jobState.setState(JobState.RunningState.RUNNING);
try {
LOG.info("Starting job " + jobId);
notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_START, new JobListenerAction() {
@Override
public void apply(JobListener jobListener, JobContext jobContext)
throws Exception {
jobListener.onJobStart(jobContext);
}
});
TimingEvent workUnitsPreparationTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
workUnitStream = processWorkUnitStream(workUnitStream, jobState);
// If it is a streaming source, workunits cannot be counted
this.jobContext.getJobState().setProp(NUM_WORKUNITS,
workUnitStream.isSafeToMaterialize() ? workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS), jobState);
// dump the work unit if tracking logs are enabled (*AFTER* any materialization done for counting)
workUnitStream = addWorkUnitTrackingPerConfig(workUnitStream, jobState, LOG);
workUnitsPreparationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.WORK_UNITS_PREPARATION));
// Write job execution info to the job history store before the job starts to run
this.jobContext.storeJobExecutionInfo();
TimingEvent jobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_RUN);
// Start the job and wait for it to finish
runWorkUnitStream(workUnitStream);
jobRunTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,EventName.JOB_RUN));
this.eventSubmitter
.submit(CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, "JOB_" + jobState.getState()));
// Check and set final job jobPropsState upon job completion
if (jobState.getState() == JobState.RunningState.CANCELLED) {
LOG.info(String.format("Job %s has been cancelled, aborting now", jobId));
return;
}
TimingEvent jobCommitTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_COMMIT);
this.jobContext.finalizeJobStateBeforeCommit();
this.jobContext.commit();
postProcessJobState(jobState);
jobCommitTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_COMMIT));
} finally {
long endTime = System.currentTimeMillis();
jobState.setEndTime(endTime);
jobState.setDuration(endTime - jobState.getStartTime());
}
} catch (Throwable t) {
jobState.setState(JobState.RunningState.FAILED);
String errMsg = "Failed to launch and run job " + jobId + " due to " + t.getMessage();
LOG.error(errMsg + ": " + t, t);
this.jobContext.getJobState().setJobFailureException(t);
jobState.setProp(ConfigurationKeys.JOB_FAILURES_KEY,
Integer.parseInt(jobState.getProp(ConfigurationKeys.JOB_FAILURES_KEY, "0")) + 1);
} finally {
try {
troubleshooter.refineIssues();
troubleshooter.logIssueSummary();
troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
} catch (Exception e) {
LOG.error("Failed to report issues", e);
}
try {
TimingEvent jobCleanupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CLEANUP);
cleanupStagingData(jobState);
jobCleanupTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_CLEANUP));
// Write job execution info to the job history store upon job termination
this.jobContext.storeJobExecutionInfo();
} finally {
launchJobTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext, EventName.FULL_JOB_EXECUTION));
if (isWorkUnitsEmpty) {
//If no WorkUnits are created, first send the JobCompleteTimer event.
notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
@Override
public void apply(JobListener jobListener, JobContext jobContext)
throws Exception {
jobListener.onJobCompletion(jobContext);
}
});
//Next, send the JobSucceededTimer event.
notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
@Override
public void apply(JobListener jobListener, JobContext jobContext)
throws Exception {
jobListener.onJobFailure(jobContext);
}
});
} else {
for (JobState.DatasetState datasetState : this.jobContext.getDatasetStatesByUrns().values()) {
// Set the overall job state to FAILED if the job failed to process any dataset
if (datasetState.getState() == JobState.RunningState.FAILED) {
jobState.setState(JobState.RunningState.FAILED);
LOG.warn("At least one dataset state is FAILED. Setting job state to FAILED.");
break;
}
}
notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
@Override
public void apply(JobListener jobListener, JobContext jobContext)
throws Exception {
jobListener.onJobCompletion(jobContext);
}
});
if (jobState.getState() == JobState.RunningState.FAILED || jobState.getState() == JobState.RunningState.CANCELLED) {
notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_FAILED, new JobListenerAction() {
@Override
public void apply(JobListener jobListener, JobContext jobContext)
throws Exception {
jobListener.onJobFailure(jobContext);
}
});
throw new JobException(String.format("Job %s failed", jobId));
} else {
notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
@Override
public void apply(JobListener jobListener, JobContext jobContext)
throws Exception {
jobListener.onJobFailure(jobContext);
}
});
}
}
}
}
} finally {
// Stop metrics reporting
if (this.jobContext.getJobMetricsOptional().isPresent()) {
JobMetrics.remove(jobState);
}
MDC.remove(ConfigurationKeys.JOB_NAME_KEY);
MDC.remove(ConfigurationKeys.JOB_KEY_KEY);
}
}