public void launchJob()

in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java [433:678]


  public void launchJob(JobListener jobListener)
      throws JobException {
    String jobId = this.jobContext.getJobId();
    final JobState jobState = this.jobContext.getJobState();
    boolean isWorkUnitsEmpty = false;
    try {
      MDC.put(ConfigurationKeys.JOB_NAME_KEY, this.jobContext.getJobName());
      MDC.put(ConfigurationKeys.JOB_KEY_KEY, this.jobContext.getJobKey());
      TimingEvent launchJobTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.FULL_JOB_EXECUTION);

      try (Closer closer = Closer.create()) {
        closer.register(this.jobContext);
        notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_PREPARE, new JobListenerAction() {
          @Override
          public void apply(JobListener jobListener, JobContext jobContext)
              throws Exception {
            jobListener.onJobPrepare(jobContext);
          }
        });

        if (this.jobContext.getSemantics() == DeliverySemantics.EXACTLY_ONCE) {

          // If exactly-once is used, commit sequences of the previous run must be successfully compelted
          // before this run can make progress.
          executeUnfinishedCommitSequences(jobState.getJobName());
        }

        Source<?, ?> source = this.jobContext.getSource();
        if (source instanceof InfiniteSource) {
          ((InfiniteSource) source).getEventBus().register(this);
        } else if (source instanceof SourceDecorator) {
          if (((SourceDecorator<?, ?>) source).getEventBus() != null) {
            ((SourceDecorator<?, ?>) source).getEventBus().register(this);
          }
        }
        TimingEvent workUnitsCreationTimer =
            this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
        WorkUnitStream workUnitStream;
        if (source instanceof WorkUnitStreamSource) {
          workUnitStream = ((WorkUnitStreamSource) source).getWorkunitStream(jobState);
        } else {
          workUnitStream = new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
        }
        workUnitsCreationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
            EventName.WORK_UNITS_CREATION));

        this.gobblinJobMetricsReporter.reportWorkUnitCreationTimerMetrics(workUnitsCreationTimer, jobState);
        // The absence means there is something wrong getting the work units
        if (workUnitStream == null || workUnitStream.getWorkUnits() == null) {
          this.eventSubmitter.submit(JobEvent.WORK_UNITS_MISSING);
          jobState.setState(JobState.RunningState.FAILED);
          String errMsg = "Failed to get work units for job " + jobId;
          this.jobContext.getJobState().setJobFailureMessage(errMsg);
          this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
          this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(0, jobState);
          throw new JobException(errMsg);
        }

        // No work unit to run
        if (!workUnitStream.getWorkUnits().hasNext()) {
          this.eventSubmitter.submit(JobEvent.WORK_UNITS_EMPTY);
          LOG.warn("No work units have been created for job " + jobId);
          jobState.setState(JobState.RunningState.COMMITTED);
          isWorkUnitsEmpty = true;
          this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
          this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(0, jobState);
          return;
        }

        // calculation of total bytes to copy in a job used to track a job's copy progress
        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, ConfigurationKeys.DEFAULT_REPORT_JOB_PROGRESS)) {
            LOG.info("Report job progress config is turned on");
          if (workUnitStream.isSafeToMaterialize()) {
            long totalSizeInBytes = sumWorkUnitsSizes(workUnitStream);
            this.jobContext.getJobState().setProp(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE, totalSizeInBytes);
          } else {
            LOG.warn("Property " + ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
                + "progress cannot be reported for infinite work unit streams. Turn off property "
                + ConfigurationKeys.REPORT_JOB_PROGRESS);
          }
        }

        // Perform work needed before writing is done
        this.canCleanUpStagingData = this.canCleanStagingData(this.jobContext.getJobState());
        this.destDatasetHandlerService = new DestinationDatasetHandlerService(jobState, canCleanUpStagingData, this.eventSubmitter);
        closer.register(this.destDatasetHandlerService);
        workUnitStream = this.destDatasetHandlerService.executeHandlers(workUnitStream);

        //Initialize writer and converter(s)
        closer.register(WriterInitializerFactory.newInstace(jobState, workUnitStream)).initialize();
        closer.register(ConverterInitializerFactory.newInstance(jobState, workUnitStream)).initialize();

        TimingEvent stagingDataCleanTimer =
            this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_STAGING_DATA_CLEAN);
        // Cleanup left-over staging data possibly from the previous run. This is particularly
        // important if the current batch of WorkUnits include failed WorkUnits from the previous
        // run which may still have left-over staging data not cleaned up yet.
        cleanLeftoverStagingData(workUnitStream, jobState);
        stagingDataCleanTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
            EventName.MR_STAGING_DATA_CLEAN));

        long startTime = System.currentTimeMillis();
        jobState.setStartTime(startTime);
        jobState.setState(JobState.RunningState.RUNNING);

        try {
          LOG.info("Starting job " + jobId);

          notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_START, new JobListenerAction() {
            @Override
            public void apply(JobListener jobListener, JobContext jobContext)
                throws Exception {
              jobListener.onJobStart(jobContext);
            }
          });

          TimingEvent workUnitsPreparationTimer =
              this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
          workUnitStream = processWorkUnitStream(workUnitStream, jobState);

          // If it is a streaming source, workunits cannot be counted
          this.jobContext.getJobState().setProp(NUM_WORKUNITS,
              workUnitStream.isSafeToMaterialize() ? workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
          this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS), jobState);
          // dump the work unit if tracking logs are enabled (*AFTER* any materialization done for counting)
          workUnitStream = addWorkUnitTrackingPerConfig(workUnitStream, jobState, LOG);

          workUnitsPreparationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
              EventName.WORK_UNITS_PREPARATION));

          // Write job execution info to the job history store before the job starts to run
          this.jobContext.storeJobExecutionInfo();

          TimingEvent jobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_RUN);
          // Start the job and wait for it to finish
          runWorkUnitStream(workUnitStream);
          jobRunTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,EventName.JOB_RUN));

          this.eventSubmitter
              .submit(CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, "JOB_" + jobState.getState()));

          // Check and set final job jobPropsState upon job completion
          if (jobState.getState() == JobState.RunningState.CANCELLED) {
            LOG.info(String.format("Job %s has been cancelled, aborting now", jobId));
            return;
          }

          TimingEvent jobCommitTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_COMMIT);
          this.jobContext.finalizeJobStateBeforeCommit();
          this.jobContext.commit();
          postProcessJobState(jobState);
          jobCommitTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_COMMIT));
        } finally {
          long endTime = System.currentTimeMillis();
          jobState.setEndTime(endTime);
          jobState.setDuration(endTime - jobState.getStartTime());
        }
      } catch (Throwable t) {
        jobState.setState(JobState.RunningState.FAILED);
        String errMsg = "Failed to launch and run job " + jobId + " due to " + t.getMessage();
        LOG.error(errMsg + ": " + t, t);
        this.jobContext.getJobState().setJobFailureException(t);
        jobState.setProp(ConfigurationKeys.JOB_FAILURES_KEY,
            Integer.parseInt(jobState.getProp(ConfigurationKeys.JOB_FAILURES_KEY, "0")) + 1);
      } finally {
        try {
          troubleshooter.refineIssues();
          troubleshooter.logIssueSummary();
          troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
        } catch (Exception e) {
          LOG.error("Failed to report issues", e);
        }

        try {
          TimingEvent jobCleanupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CLEANUP);
          cleanupStagingData(jobState);
          jobCleanupTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_CLEANUP));
          // Write job execution info to the job history store upon job termination
          this.jobContext.storeJobExecutionInfo();
        } finally {
          launchJobTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext, EventName.FULL_JOB_EXECUTION));
          if (isWorkUnitsEmpty) {
            //If no WorkUnits are created, first send the JobCompleteTimer event.
            notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
              @Override
              public void apply(JobListener jobListener, JobContext jobContext)
                  throws Exception {
                jobListener.onJobCompletion(jobContext);
              }
            });
            //Next, send the JobSucceededTimer event.
            notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
              @Override
              public void apply(JobListener jobListener, JobContext jobContext)
                  throws Exception {
                jobListener.onJobFailure(jobContext);
              }
            });
          } else {
            for (JobState.DatasetState datasetState : this.jobContext.getDatasetStatesByUrns().values()) {
              // Set the overall job state to FAILED if the job failed to process any dataset
              if (datasetState.getState() == JobState.RunningState.FAILED) {
                jobState.setState(JobState.RunningState.FAILED);
                LOG.warn("At least one dataset state is FAILED. Setting job state to FAILED.");
                break;
              }
            }

            notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
              @Override
              public void apply(JobListener jobListener, JobContext jobContext)
                  throws Exception {
                jobListener.onJobCompletion(jobContext);
              }
            });

            if (jobState.getState() == JobState.RunningState.FAILED || jobState.getState() == JobState.RunningState.CANCELLED) {
              notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_FAILED, new JobListenerAction() {
                @Override
                public void apply(JobListener jobListener, JobContext jobContext)
                    throws Exception {
                  jobListener.onJobFailure(jobContext);
                }
              });
              throw new JobException(String.format("Job %s failed", jobId));
            } else {
              notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
                @Override
                public void apply(JobListener jobListener, JobContext jobContext)
                    throws Exception {
                  jobListener.onJobFailure(jobContext);
                }
              });
            }
          }
        }
      }
    } finally {
      // Stop metrics reporting
      if (this.jobContext.getJobMetricsOptional().isPresent()) {
        JobMetrics.remove(jobState);
      }
      MDC.remove(ConfigurationKeys.JOB_NAME_KEY);
      MDC.remove(ConfigurationKeys.JOB_KEY_KEY);
    }
  }