in gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java [87:207]
public void onCompactionJobComplete(FileSystemDataset dataset) throws IOException {
if (dataset.isVirtual()) {
return;
}
if (configurator != null && configurator.isJobCreated()) {
CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset);
Path tmpPath = configurator.getMrOutputPath();
Path dstPath = new Path(result.getDstAbsoluteDir());
// this is append delta mode due to the compaction rename source dir mode being enabled
boolean appendDeltaOutput = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
Job job = this.configurator.getConfiguredJob();
long newTotalRecords = 0;
long oldTotalRecords = helper.readRecordCount(new Path(result.getDstAbsoluteDir()));
long executionCount = helper.readExecutionCount(new Path(result.getDstAbsoluteDir()));
List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, tmpPath, this.fs,
ImmutableList.of(configurator.getFileExtension()));
HashSet<Path> outputFiles = new HashSet<>();
if (appendDeltaOutput) {
FsPermission permission =
HadoopUtils.deserializeFsPermission(this.state, MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath, permission);
// append files under mr output to destination
for (Path filePath : goodPaths) {
String fileName = filePath.getName();
log.info(String.format("Adding %s to %s", filePath.toString(), dstPath));
Path outPath = new Path(dstPath, fileName);
if (!this.fs.rename(filePath, outPath)) {
throw new IOException(String.format("Unable to move %s to %s", filePath.toString(), outPath.toString()));
}
}
// Obtain record count from input file names.
// We don't get record count from map-reduce counter because in the next run, the threshold (delta record)
// calculation is based on the input file names. By pre-defining which input folders are involved in the
// MR execution, it is easy to track how many files are involved in MR so far, thus calculating the number of total records
// (all previous run + current run) is possible.
newTotalRecords = this.configurator.getFileNameRecordCount();
} else {
if (state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, false)) {
Path oldFilePath =
PathUtils.mergePaths(dstPath, new Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount)));
dstPath =
PathUtils.mergePaths(dstPath, new Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
this.configurator.getOldFiles().add(this.fs.makeQualified(oldFilePath).toString());
} else {
this.configurator.getOldFiles().add(this.fs.makeQualified(dstPath).toString());
}
// It is possible that the destination path is a non-empty directory if the previous run failed.
// Hence, always delete the destination path before moving the tmp path to the destination path.
this.fs.delete(dstPath, true);
FsPermission permission =
HadoopUtils.deserializeFsPermission(this.state, MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath.getParent(), permission);
if (!this.fs.rename(tmpPath, dstPath)) {
throw new IOException(String.format("Unable to move %s to %s", tmpPath, dstPath));
}
// Obtain record count from map reduce job counter
// We don't get record count from file name because tracking which files are actually involved in the MR execution can
// be hard. This is due to new minutely data is rolled up to hourly folder but from daily compaction perspective we are not
// able to tell which file are newly added (because we simply pass all hourly folders to MR job instead of individual files).
Counter counter = job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
newTotalRecords = counter.getValue();
}
final Path finalDstPath = dstPath;
goodPaths.stream().forEach(p -> {
String fileName = p.getName();
outputFiles.add(new Path(finalDstPath, fileName));
});
this.configurator.setDstNewFiles(outputFiles);
State compactionState = helper.loadState(new Path(result.getDstAbsoluteDir()));
if (executionCount != 0) {
compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executionCount),
Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executionCount),
Long.toString(executionCount));
compactionState.setProp(DUPLICATE_COUNT_TOTAL + Long.toString(executionCount),
compactionState.getProp(DUPLICATE_COUNT_TOTAL, "null"));
}
if (state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED, false)) {
//GMCE enabled, set the key to be false to indicate that GMCE has not been sent yet
compactionState.setProp(CompactionGMCEPublishingAction.GMCE_EMITTED_KEY, false);
}
compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1));
compactionState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
this.configurator.getConfiguredJob().getJobID().toString());
compactionState.setProp(DUPLICATE_COUNT_TOTAL,
job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp(DUPLICATE_COUNT_TOTAL));
log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath,
executionCount + 1);
// submit events for record count
if (eventSubmitter != null) {
Map<String, String> eventMetadataMap =
ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1),
CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);
}
}
}