private void publishFileSet()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java [306:411]


  private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
      Collection<WorkUnitState> datasetWorkUnitStates) throws IOException {
    Map<String, String> additionalMetadata = Maps.newHashMap();

    Preconditions.checkArgument(!datasetWorkUnitStates.isEmpty(),
        String.format("[%s] publishFileSet got empty work unit states. This is an error in code.", datasetAndPartition.identifier()));

    WorkUnitStatesHelper statesHelper = new WorkUnitStatesHelper(datasetWorkUnitStates);
    WorkUnitState sampledWorkUnitState =  statesHelper.getAny();

    CopyableDatasetMetadata metadata = CopyableDatasetMetadata.deserialize(
        sampledWorkUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));

    // If not already done, ensure that the writer outputs have the job ID appended to avoid corruption from previous runs
    FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(sampledWorkUnitState);
    Path writerOutputDir = new Path(sampledWorkUnitState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));

    Path datasetWriterOutputPath = new Path(writerOutputDir, datasetAndPartition.identifier());

    log.info("Merging all split work units.");
    DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, statesHelper.getAll());

    log.info("[{}] Publishing fileSet from {} for dataset {}", datasetAndPartition.identifier(),
        datasetWriterOutputPath, metadata.getDatasetURN());

    List<CommitStep> prePublishSteps = statesHelper.getPrePublishSteps();
    List<CommitStep> postPublishSteps = statesHelper.getPostPublishSteps();
    log.info("[{}] Found {} pre-publish steps and {} post-publish steps.", datasetAndPartition.identifier(),
        prePublishSteps.size(), postPublishSteps.size());

    executeCommitSequence(prePublishSteps);

    if (statesHelper.hasAnyCopyableFile()) {
      // Targets are always absolute, so we start moving from root (will skip any existing directories).
      HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/"));
    } else {
      log.info("[{}] No copyable files in dataset. Proceeding to post-publish steps.", datasetAndPartition.identifier());
    }

    this.fs.delete(datasetWriterOutputPath, true);

    long datasetOriginTimestamp = Long.MAX_VALUE;
    long datasetUpstreamTimestamp = Long.MAX_VALUE;
    Optional<String> fileSetRoot = Optional.absent();

    // ensure every successful state is committed
    // WARNING: this MUST NOT run before the WU is actually executed--hence NOT YET for post-publish steps!
    // (that's because `WorkUnitState::getWorkingState()` returns `WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs yet to execute)
    for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) {
      if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
        wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
      }
      CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
      if (copyEntity instanceof CopyableFile) {
        CopyableFile copyableFile = (CopyableFile) copyEntity;
        if (wus.getWorkingState() == WorkingState.COMMITTED) {
          // Committed files should exist in destination otherwise FNFE will be thrown
          preserveFileAttrInPublisher(copyableFile);
          CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, copyableFile, wus);
          // Dataset Output path is injected in each copyableFile.
          // This can be optimized by having a dataset level equivalent class for copyable entities
          // and storing dataset related information, e.g. dataset output path, there.

          // Currently datasetOutputPath is only present for hive datasets.
          if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) {
            fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
          }
          if (lineageInfo.isPresent()) {
            lineageInfo.get().putDestination(copyableFile.getDestinationData(), 0, wus);
          }
        }
        if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
          datasetOriginTimestamp = copyableFile.getOriginTimestamp();
        }
        if (datasetUpstreamTimestamp > copyableFile.getUpstreamTimestamp()) {
          datasetUpstreamTimestamp = copyableFile.getUpstreamTimestamp();
        }
      }
    }

    // execute `postPublishSteps` after preserving file attributes, as some, like `SetPermissionCommitStep`, will themselves set permissions
    executeCommitSequence(postPublishSteps);

    // since `postPublishSteps` have now executed, finally ready to ensure every successful WU state of those gets committed
    for (WorkUnitState wus : statesHelper.getPostPublishStates()) {
      if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
        wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
      }
      // NOTE: no need for `CopyableFile`-specific custom handling, as above, because `PostPublishStep extends CommitStepCopyEntity` and so could not be one
    }

    // if there are no valid values for datasetOriginTimestamp and datasetUpstreamTimestamp, use
    // something more readable
    if (Long.MAX_VALUE == datasetOriginTimestamp) {
      datasetOriginTimestamp = 0;
    }
    if (Long.MAX_VALUE == datasetUpstreamTimestamp) {
      datasetUpstreamTimestamp = 0;
    }

    additionalMetadata.put(SlaEventKeys.SOURCE_URI, this.state.getProp(SlaEventKeys.SOURCE_URI));
    additionalMetadata.put(SlaEventKeys.DESTINATION_URI, this.state.getProp(SlaEventKeys.DESTINATION_URI));
    additionalMetadata.put(SlaEventKeys.DATASET_OUTPUT_PATH, fileSetRoot.or("Unknown"));
    CopyEventSubmitterHelper.submitSuccessfulDatasetPublish(this.eventSubmitter, datasetAndPartition,
        Long.toString(datasetOriginTimestamp), Long.toString(datasetUpstreamTimestamp), additionalMetadata);
  }