private void runJobExecutionLauncher()

in gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java [262:367]


  private void runJobExecutionLauncher() throws JobException {
    long startTime = 0;
    String newPlanningId;
    Closer closer = Closer.create();
    try {
      HelixManager planningJobHelixManager = this.taskDriverHelixManager.orElse(this.jobHelixManager);
      planningJobHelixManager.connect();

      String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
          GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());

      // Check if any existing planning job is running
      Optional<String> planningJobIdFromStore = jobsMapping.getPlanningJobId(this.jobUri);
      boolean nonblocking = false;
      // start of critical section to check if a job with same job name is running
      Lock jobLock = locks.get(this.jobUri);
      jobLock.lock();

      try {
        if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
          TimingEvent timer = new TimingEvent(eventSubmitter, TimingEvent.JOB_SKIPPED_TIME);
          HashMap<String, String> metadata = new HashMap<>(Tag.toMap(Tag.tagValuesToString(
              HelixUtils.initBaseEventTags(jobProps, Lists.newArrayList()))));
          timer.stop(metadata);
          planningJobLauncherMetrics.skippedPlanningJobs.mark();
          return;
        }

        log.info("Planning job for {} does not exist. First time run.", this.jobUri);

        GobblinHelixDistributeJobExecutionLauncher.Builder builder = GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(
            new ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));

        // Make a separate copy because we could update some of attributes in job properties (like adding planning id).
        Properties jobPlanningProps = new Properties();
        jobPlanningProps.putAll(this.jobProps);

        // Inject planning id and start time
        newPlanningId = HelixJobsMapping.createPlanningJobId(jobPlanningProps);
        jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, newPlanningId);
        jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, String.valueOf(System.currentTimeMillis()));

        builder.setSysProps(this.sysProps);
        builder.setJobPlanningProps(jobPlanningProps);
        builder.setPlanningJobHelixManager(planningJobHelixManager);
        builder.setAppWorkDir(this.appWorkDir);
        builder.setJobsMapping(this.jobsMapping);
        builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
        builder.setHelixMetrics(this.helixMetrics);

        // if the distributed job launcher should wait for planning job completion

        Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
            .withFallback(ConfigUtils.propertiesToConfig(sysProps));

        nonblocking = ConfigUtils
            .getBoolean(combined, GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
                GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);

        log.info("Planning job {} started.", newPlanningId);
        GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
        closer.register(launcher);
        this.jobsMapping.setPlanningJobId(this.jobUri, newPlanningId);
        startTime = System.currentTimeMillis();
        this.currentJobMonitor = launcher.launchJob(null);

        // make sure the planning job is initialized (or visible) to other parallel running threads,
        // so that the same critical section check (querying Helix for job completeness)
        // can be applied.
        Duration submissionTimeout = Duration.ofSeconds(PropertiesUtils
            .getPropAsLong(sysProps, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
                GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS));
        HelixUtils.waitJobInitialization(planningJobHelixManager, newPlanningId, newPlanningId, submissionTimeout);
      } finally {
        planningJobHelixManager.disconnect();
        // end of the critical section to check if a job with same job name is running
        jobLock.unlock();
      }

      // we can remove the job spec from the catalog because Helix will drive this job to the end.
      this.deleteJobSpec();

      // If we are using non-blocking mode, this get() only guarantees the planning job is submitted.
      // It doesn't guarantee the job will finish because internally we won't wait for Helix completion.
      this.currentJobMonitor.get();
      this.currentJobMonitor = null;
      if (nonblocking) {
        log.info("Planning job {} submitted successfully.", newPlanningId);
      } else {
        log.info("Planning job {} finished.", newPlanningId);
        this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime);
      }
    } catch (Exception e) {
      if (startTime != 0) {
        this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
      }
      log.error("Failed to run planning job for {}", this.jobUri, e);
      throw new JobException("Failed to run planning job for " + this.jobUri, e);
    } finally {
      try {
        closer.close();
      } catch (IOException e) {
        throw new JobException("Cannot properly close planning job for " + this.jobUri, e);
      }
    }
  }