private void submitCompactionJobsAndWaitForCompletion()

in gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java [757:806]


  private void submitCompactionJobsAndWaitForCompletion() {
    LOG.info("Submitting compaction jobs. Number of datasets: " + this.datasets.size());

    boolean allDatasetsCompleted = false;
    while (!allDatasetsCompleted) {
      allDatasetsCompleted = true;
      for (Dataset dataset : this.datasets) {
        MRCompactorJobRunner jobRunner = MRCompactor.this.jobRunnables.get(dataset);

        if (dataset.state() == VERIFIED || dataset.state() == UNVERIFIED) {
          allDatasetsCompleted = false;
          // Run compaction for a dataset, if it is not already running or completed
          if (jobRunner == null || jobRunner.status() == ABORTED) {
            runCompactionForDataset(dataset, dataset.state() == VERIFIED);
          }
        } else if (dataset.state() == GIVEN_UP) {
          if (this.shouldPublishDataIfCannotVerifyCompl) {
            allDatasetsCompleted = false;
            if (jobRunner == null || jobRunner.status() == ABORTED) {
              runCompactionForDataset(dataset, true);
            } else {
              jobRunner.proceed();
            }
          } else {
            if (jobRunner != null) {
              jobRunner.abort();
            }
          }
        }
      }

      if (this.stopwatch.elapsed(TimeUnit.MINUTES) >= this.compactionTimeoutMinutes) {

        // Compaction timed out. Killing all compaction jobs running
        LOG.error("Compaction timed-out. Killing all running jobs");
        for (MRCompactorJobRunner jobRunner : MRCompactor.this.jobRunnables.values()) {
          jobRunner.abort();
        }
        break;
      }

      // Sleep for a few seconds before another round
      try {
        Thread.sleep(TimeUnit.SECONDS.toMillis(COMPACTION_JOB_WAIT_INTERVAL_SECONDS));
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("Interrupted while waiting", e);
      }
    }
  }