in gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java [277:362]
public void run() {
Configuration conf = HadoopUtils.getConfFromState(this.dataset.jobProps());
// Turn on mapreduce output compression by default
if (conf.get("mapreduce.output.fileoutputformat.compress") == null && conf.get("mapred.output.compress") == null) {
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
}
// Disable delegation token cancellation by default
if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
}
try {
DateTime compactionTimestamp = getCompactionTimestamp();
LOG.info("MR Compaction Job Timestamp " + compactionTimestamp.getMillis());
if (this.dataset.jobProps().getPropAsBoolean(MRCompactor.COMPACTION_JOB_LATE_DATA_MOVEMENT_TASK, false)) {
List<Path> newLateFilePaths = Lists.newArrayList();
for (String filePathString : this.dataset.jobProps()
.getPropAsList(MRCompactor.COMPACTION_JOB_LATE_DATA_FILES)) {
if (FilenameUtils.isExtension(filePathString, getApplicableFileExtensions())) {
newLateFilePaths.add(new Path(filePathString));
}
}
Path lateDataOutputPath = this.outputDeduplicated ? this.dataset.outputLatePath() : this.dataset.outputPath();
LOG.info(String.format("Copying %d late data files to %s", newLateFilePaths.size(), lateDataOutputPath));
if (this.outputDeduplicated) {
if (!this.fs.exists(lateDataOutputPath)) {
if (!this.fs.mkdirs(lateDataOutputPath)) {
throw new RuntimeException(
String.format("Failed to create late data output directory: %s.", lateDataOutputPath.toString()));
}
}
}
this.copyDataFiles(lateDataOutputPath, newLateFilePaths);
if (this.outputDeduplicated) {
dataset.checkIfNeedToRecompact (datasetHelper);
}
this.status = Status.COMMITTED;
} else {
if (this.fs.exists(this.dataset.outputPath()) && !canOverwriteOutputDir()) {
LOG.warn(String.format("Output paths %s exists. Will not compact %s.", this.dataset.outputPath(),
this.dataset.inputPaths()));
this.status = Status.COMMITTED;
return;
}
addJars(conf);
Job job = Job.getInstance(conf);
this.configureJob(job);
this.submitAndWait(job);
if (shouldPublishData(compactionTimestamp)) {
// remove all invalid empty files due to speculative task execution
List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, this.dataset.outputTmpPath(), this.tmpFs,
ImmutableList.of("avro"));
if (!this.recompactAllData && this.recompactFromDestPaths) {
// append new files without deleting output directory
addGoodFilesToOutputPath(goodPaths);
// clean up late data from outputLateDirectory, which has been set to inputPath
deleteFilesByPaths(this.dataset.inputPaths());
} else {
moveTmpPathToOutputPath();
if (this.recompactFromDestPaths) {
deleteFilesByPaths(this.dataset.additionalInputPaths());
}
}
submitSlaEvent(job);
LOG.info("Successfully published data for input folder " + this.dataset.inputPaths());
this.status = Status.COMMITTED;
} else {
LOG.info("Data not published for input folder " + this.dataset.inputPaths() + " due to incompleteness");
this.status = Status.ABORTED;
return;
}
}
if (renameSourceDir) {
MRCompactor.renameSourceDirAsCompactionComplete (this.fs, this.dataset);
} else {
this.markOutputDirAsCompleted(compactionTimestamp);
}
this.submitRecordsCountsEvent();
} catch (Throwable t) {
throw Throwables.propagate(t);
}
}