in gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java [262:367]
private void runJobExecutionLauncher() throws JobException {
long startTime = 0;
String newPlanningId;
Closer closer = Closer.create();
try {
HelixManager planningJobHelixManager = this.taskDriverHelixManager.orElse(this.jobHelixManager);
planningJobHelixManager.connect();
String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
// Check if any existing planning job is running
Optional<String> planningJobIdFromStore = jobsMapping.getPlanningJobId(this.jobUri);
boolean nonblocking = false;
// start of critical section to check if a job with same job name is running
Lock jobLock = locks.get(this.jobUri);
jobLock.lock();
try {
if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
TimingEvent timer = new TimingEvent(eventSubmitter, TimingEvent.JOB_SKIPPED_TIME);
HashMap<String, String> metadata = new HashMap<>(Tag.toMap(Tag.tagValuesToString(
HelixUtils.initBaseEventTags(jobProps, Lists.newArrayList()))));
timer.stop(metadata);
planningJobLauncherMetrics.skippedPlanningJobs.mark();
return;
}
log.info("Planning job for {} does not exist. First time run.", this.jobUri);
GobblinHelixDistributeJobExecutionLauncher.Builder builder = GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(
new ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
// Make a separate copy because we could update some of attributes in job properties (like adding planning id).
Properties jobPlanningProps = new Properties();
jobPlanningProps.putAll(this.jobProps);
// Inject planning id and start time
newPlanningId = HelixJobsMapping.createPlanningJobId(jobPlanningProps);
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, newPlanningId);
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, String.valueOf(System.currentTimeMillis()));
builder.setSysProps(this.sysProps);
builder.setJobPlanningProps(jobPlanningProps);
builder.setPlanningJobHelixManager(planningJobHelixManager);
builder.setAppWorkDir(this.appWorkDir);
builder.setJobsMapping(this.jobsMapping);
builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
builder.setHelixMetrics(this.helixMetrics);
// if the distributed job launcher should wait for planning job completion
Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
.withFallback(ConfigUtils.propertiesToConfig(sysProps));
nonblocking = ConfigUtils
.getBoolean(combined, GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
log.info("Planning job {} started.", newPlanningId);
GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
closer.register(launcher);
this.jobsMapping.setPlanningJobId(this.jobUri, newPlanningId);
startTime = System.currentTimeMillis();
this.currentJobMonitor = launcher.launchJob(null);
// make sure the planning job is initialized (or visible) to other parallel running threads,
// so that the same critical section check (querying Helix for job completeness)
// can be applied.
Duration submissionTimeout = Duration.ofSeconds(PropertiesUtils
.getPropAsLong(sysProps, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS));
HelixUtils.waitJobInitialization(planningJobHelixManager, newPlanningId, newPlanningId, submissionTimeout);
} finally {
planningJobHelixManager.disconnect();
// end of the critical section to check if a job with same job name is running
jobLock.unlock();
}
// we can remove the job spec from the catalog because Helix will drive this job to the end.
this.deleteJobSpec();
// If we are using non-blocking mode, this get() only guarantees the planning job is submitted.
// It doesn't guarantee the job will finish because internally we won't wait for Helix completion.
this.currentJobMonitor.get();
this.currentJobMonitor = null;
if (nonblocking) {
log.info("Planning job {} submitted successfully.", newPlanningId);
} else {
log.info("Planning job {} finished.", newPlanningId);
this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime);
}
} catch (Exception e) {
if (startTime != 0) {
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
}
log.error("Failed to run planning job for {}", this.jobUri, e);
throw new JobException("Failed to run planning job for " + this.jobUri, e);
} finally {
try {
closer.close();
} catch (IOException e) {
throw new JobException("Cannot properly close planning job for " + this.jobUri, e);
}
}
}