public void run()

in gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java [277:362]


  public void run() {
    Configuration conf = HadoopUtils.getConfFromState(this.dataset.jobProps());

    // Turn on mapreduce output compression by default
    if (conf.get("mapreduce.output.fileoutputformat.compress") == null && conf.get("mapred.output.compress") == null) {
      conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
    }

    // Disable delegation token cancellation by default
    if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
      conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
    }

    try {
      DateTime compactionTimestamp = getCompactionTimestamp();
      LOG.info("MR Compaction Job Timestamp " + compactionTimestamp.getMillis());
      if (this.dataset.jobProps().getPropAsBoolean(MRCompactor.COMPACTION_JOB_LATE_DATA_MOVEMENT_TASK, false)) {
        List<Path> newLateFilePaths = Lists.newArrayList();
        for (String filePathString : this.dataset.jobProps()
            .getPropAsList(MRCompactor.COMPACTION_JOB_LATE_DATA_FILES)) {
          if (FilenameUtils.isExtension(filePathString, getApplicableFileExtensions())) {
            newLateFilePaths.add(new Path(filePathString));
          }
        }

        Path lateDataOutputPath = this.outputDeduplicated ? this.dataset.outputLatePath() : this.dataset.outputPath();
        LOG.info(String.format("Copying %d late data files to %s", newLateFilePaths.size(), lateDataOutputPath));
        if (this.outputDeduplicated) {
          if (!this.fs.exists(lateDataOutputPath)) {
            if (!this.fs.mkdirs(lateDataOutputPath)) {
              throw new RuntimeException(
                  String.format("Failed to create late data output directory: %s.", lateDataOutputPath.toString()));
            }
          }
        }
        this.copyDataFiles(lateDataOutputPath, newLateFilePaths);
        if (this.outputDeduplicated) {
          dataset.checkIfNeedToRecompact (datasetHelper);
        }
        this.status = Status.COMMITTED;
      } else {
        if (this.fs.exists(this.dataset.outputPath()) && !canOverwriteOutputDir()) {
          LOG.warn(String.format("Output paths %s exists. Will not compact %s.", this.dataset.outputPath(),
              this.dataset.inputPaths()));
          this.status = Status.COMMITTED;
          return;
        }
        addJars(conf);
        Job job = Job.getInstance(conf);
        this.configureJob(job);
        this.submitAndWait(job);
        if (shouldPublishData(compactionTimestamp)) {
          // remove all invalid empty files due to speculative task execution
          List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, this.dataset.outputTmpPath(), this.tmpFs,
              ImmutableList.of("avro"));

          if (!this.recompactAllData && this.recompactFromDestPaths) {
            // append new files without deleting output directory
            addGoodFilesToOutputPath(goodPaths);
            // clean up late data from outputLateDirectory, which has been set to inputPath
            deleteFilesByPaths(this.dataset.inputPaths());
          } else {
            moveTmpPathToOutputPath();
            if (this.recompactFromDestPaths) {
              deleteFilesByPaths(this.dataset.additionalInputPaths());
            }
          }
          submitSlaEvent(job);
          LOG.info("Successfully published data for input folder " + this.dataset.inputPaths());
          this.status = Status.COMMITTED;
        } else {
          LOG.info("Data not published for input folder " + this.dataset.inputPaths() + " due to incompleteness");
          this.status = Status.ABORTED;
          return;
        }
      }
      if (renameSourceDir) {
        MRCompactor.renameSourceDirAsCompactionComplete (this.fs, this.dataset);
      } else {
        this.markOutputDirAsCompleted(compactionTimestamp);
      }
      this.submitRecordsCountsEvent();
    } catch (Throwable t) {
      throw Throwables.propagate(t);
    }
  }