in gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java [757:806]
private void submitCompactionJobsAndWaitForCompletion() {
LOG.info("Submitting compaction jobs. Number of datasets: " + this.datasets.size());
boolean allDatasetsCompleted = false;
while (!allDatasetsCompleted) {
allDatasetsCompleted = true;
for (Dataset dataset : this.datasets) {
MRCompactorJobRunner jobRunner = MRCompactor.this.jobRunnables.get(dataset);
if (dataset.state() == VERIFIED || dataset.state() == UNVERIFIED) {
allDatasetsCompleted = false;
// Run compaction for a dataset, if it is not already running or completed
if (jobRunner == null || jobRunner.status() == ABORTED) {
runCompactionForDataset(dataset, dataset.state() == VERIFIED);
}
} else if (dataset.state() == GIVEN_UP) {
if (this.shouldPublishDataIfCannotVerifyCompl) {
allDatasetsCompleted = false;
if (jobRunner == null || jobRunner.status() == ABORTED) {
runCompactionForDataset(dataset, true);
} else {
jobRunner.proceed();
}
} else {
if (jobRunner != null) {
jobRunner.abort();
}
}
}
}
if (this.stopwatch.elapsed(TimeUnit.MINUTES) >= this.compactionTimeoutMinutes) {
// Compaction timed out. Killing all compaction jobs running
LOG.error("Compaction timed-out. Killing all running jobs");
for (MRCompactorJobRunner jobRunner : MRCompactor.this.jobRunnables.values()) {
jobRunner.abort();
}
break;
}
// Sleep for a few seconds before another round
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(COMPACTION_JOB_WAIT_INTERVAL_SECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting", e);
}
}
}