in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java [306:411]
private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
Collection<WorkUnitState> datasetWorkUnitStates) throws IOException {
Map<String, String> additionalMetadata = Maps.newHashMap();
Preconditions.checkArgument(!datasetWorkUnitStates.isEmpty(),
String.format("[%s] publishFileSet got empty work unit states. This is an error in code.", datasetAndPartition.identifier()));
WorkUnitStatesHelper statesHelper = new WorkUnitStatesHelper(datasetWorkUnitStates);
WorkUnitState sampledWorkUnitState = statesHelper.getAny();
CopyableDatasetMetadata metadata = CopyableDatasetMetadata.deserialize(
sampledWorkUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
// If not already done, ensure that the writer outputs have the job ID appended to avoid corruption from previous runs
FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(sampledWorkUnitState);
Path writerOutputDir = new Path(sampledWorkUnitState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
Path datasetWriterOutputPath = new Path(writerOutputDir, datasetAndPartition.identifier());
log.info("Merging all split work units.");
DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, statesHelper.getAll());
log.info("[{}] Publishing fileSet from {} for dataset {}", datasetAndPartition.identifier(),
datasetWriterOutputPath, metadata.getDatasetURN());
List<CommitStep> prePublishSteps = statesHelper.getPrePublishSteps();
List<CommitStep> postPublishSteps = statesHelper.getPostPublishSteps();
log.info("[{}] Found {} pre-publish steps and {} post-publish steps.", datasetAndPartition.identifier(),
prePublishSteps.size(), postPublishSteps.size());
executeCommitSequence(prePublishSteps);
if (statesHelper.hasAnyCopyableFile()) {
// Targets are always absolute, so we start moving from root (will skip any existing directories).
HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/"));
} else {
log.info("[{}] No copyable files in dataset. Proceeding to post-publish steps.", datasetAndPartition.identifier());
}
this.fs.delete(datasetWriterOutputPath, true);
long datasetOriginTimestamp = Long.MAX_VALUE;
long datasetUpstreamTimestamp = Long.MAX_VALUE;
Optional<String> fileSetRoot = Optional.absent();
// ensure every successful state is committed
// WARNING: this MUST NOT run before the WU is actually executed--hence NOT YET for post-publish steps!
// (that's because `WorkUnitState::getWorkingState()` returns `WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs yet to execute)
for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) {
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
}
CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
if (copyEntity instanceof CopyableFile) {
CopyableFile copyableFile = (CopyableFile) copyEntity;
if (wus.getWorkingState() == WorkingState.COMMITTED) {
// Committed files should exist in destination otherwise FNFE will be thrown
preserveFileAttrInPublisher(copyableFile);
CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, copyableFile, wus);
// Dataset Output path is injected in each copyableFile.
// This can be optimized by having a dataset level equivalent class for copyable entities
// and storing dataset related information, e.g. dataset output path, there.
// Currently datasetOutputPath is only present for hive datasets.
if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) {
fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
}
if (lineageInfo.isPresent()) {
lineageInfo.get().putDestination(copyableFile.getDestinationData(), 0, wus);
}
}
if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
datasetOriginTimestamp = copyableFile.getOriginTimestamp();
}
if (datasetUpstreamTimestamp > copyableFile.getUpstreamTimestamp()) {
datasetUpstreamTimestamp = copyableFile.getUpstreamTimestamp();
}
}
}
// execute `postPublishSteps` after preserving file attributes, as some, like `SetPermissionCommitStep`, will themselves set permissions
executeCommitSequence(postPublishSteps);
// since `postPublishSteps` have now executed, finally ready to ensure every successful WU state of those gets committed
for (WorkUnitState wus : statesHelper.getPostPublishStates()) {
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
}
// NOTE: no need for `CopyableFile`-specific custom handling, as above, because `PostPublishStep extends CommitStepCopyEntity` and so could not be one
}
// if there are no valid values for datasetOriginTimestamp and datasetUpstreamTimestamp, use
// something more readable
if (Long.MAX_VALUE == datasetOriginTimestamp) {
datasetOriginTimestamp = 0;
}
if (Long.MAX_VALUE == datasetUpstreamTimestamp) {
datasetUpstreamTimestamp = 0;
}
additionalMetadata.put(SlaEventKeys.SOURCE_URI, this.state.getProp(SlaEventKeys.SOURCE_URI));
additionalMetadata.put(SlaEventKeys.DESTINATION_URI, this.state.getProp(SlaEventKeys.DESTINATION_URI));
additionalMetadata.put(SlaEventKeys.DATASET_OUTPUT_PATH, fileSetRoot.or("Unknown"));
CopyEventSubmitterHelper.submitSuccessfulDatasetPublish(this.eventSubmitter, datasetAndPartition,
Long.toString(datasetOriginTimestamp), Long.toString(datasetUpstreamTimestamp), additionalMetadata);
}