protected static DiffPathSet fullPathDiff()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java [629:749]


  protected static DiffPathSet fullPathDiff(HiveLocationDescriptor sourceLocation,
      HiveLocationDescriptor desiredTargetLocation, Optional<HiveLocationDescriptor> currentTargetLocation,
      Optional<Partition> partition, MultiTimingEvent multiTimer, HiveCopyEntityHelper helper) throws IOException {

    // populate version strategy before analyzing diffs
    sourceLocation.populateDataFileVersionStrategy();
    desiredTargetLocation.populateDataFileVersionStrategy();

    DiffPathSet.DiffPathSetBuilder builder = DiffPathSet.builder();

    // check the strategy is not empty
    if (!sourceLocation.versionStrategy.isPresent() || !desiredTargetLocation.versionStrategy.isPresent()) {
      log.warn("Version strategy doesn't exist ({},{}), cannot handle copy.",
          sourceLocation.versionStrategy.isPresent(),
          desiredTargetLocation.versionStrategy.isPresent());
      return builder.build();
    }

    // check if the src and dst strategy are the same
    if (!sourceLocation.versionStrategy.get().getClass().getName()
        .equals(desiredTargetLocation.versionStrategy.get().getClass().getName())) {
      log.warn("Version strategy src: {} and dst: {} doesn't match, cannot handle copy.",
          sourceLocation.versionStrategy.get().getClass().getName(),
          desiredTargetLocation.versionStrategy.get().getClass().getName());
      return builder.build();
    }

    multiTimer.nextStage(Stages.SOURCE_PATH_LISTING);
    // These are the paths at the source
    Map<Path, FileStatus> sourcePaths = sourceLocation.getPaths();

    multiTimer.nextStage(Stages.TARGET_EXISTING_PATH_LISTING);
    // These are the paths that the existing target table / partition uses now
    Map<Path, FileStatus> targetExistingPaths = currentTargetLocation.isPresent()
        ? currentTargetLocation.get().getPaths() : Maps.<Path, FileStatus> newHashMap();

    multiTimer.nextStage(Stages.DESIRED_PATHS_LISTING);
    // These are the paths that exist at the destination and the new table / partition would pick up
    Map<Path, FileStatus> desiredTargetExistingPaths;
    try {
      desiredTargetExistingPaths = desiredTargetLocation.getPaths();
    } catch (IOException ioe) {
      // Thrown if inputFormat cannot find location in target. Since location doesn't exist, this set is empty.
      desiredTargetExistingPaths = Maps.newHashMap();
    }

    multiTimer.nextStage(Stages.PATH_DIFF);
    for (FileStatus sourcePath : sourcePaths.values()) {
      // For each source path
      Path newPath = helper.getTargetPathHelper().getTargetPath(sourcePath.getPath(), desiredTargetLocation.getFileSystem(), partition, true);
      boolean shouldCopy = true;
      // Can optimize by using the mod time that has already been fetched
      boolean useDirectGetModTime = sourceLocation.versionStrategy.isPresent()
          && sourceLocation.versionStrategy.get().getClass().getName().equals(
              ModTimeDataFileVersionStrategy.class.getName());

      if (desiredTargetExistingPaths.containsKey(newPath)) {
        // If the file exists at the destination, check whether it should be replaced, if not, no need to copy
        FileStatus existingTargetStatus = desiredTargetExistingPaths.get(newPath);
        Comparable srcVer = useDirectGetModTime ? sourcePath.getModificationTime() :
            sourceLocation.versionStrategy.get().getVersion(sourcePath.getPath());
        Comparable dstVer = useDirectGetModTime ? existingTargetStatus.getModificationTime() :
            desiredTargetLocation.versionStrategy.get().getVersion(existingTargetStatus.getPath());

        // destination has higher version, skip the copy
        if (srcVer.compareTo(dstVer) <= 0) {
          if (!helper.isEnforceFileSizeMatch() || existingTargetStatus.getLen() == sourcePath.getLen()) {
            log.debug("Copy from src {} (version:{}) to dst {} (version:{}) can be skipped since file size ({} bytes) is matching",
                sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer, sourcePath.getLen());
            shouldCopy = false;
          } else {
            log.debug("Copy from src {} (version:{}) to dst {} (version:{}) can not be skipped because the file size is not matching or it is enforced by this config: {}",
                sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer, CopyConfiguration.ENFORCE_FILE_LENGTH_MATCH);
          }
        } else {
          log.debug("Copy from src {} (v:{}) to dst {} (v:{}) is needed due to a higher version.",
              sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer);
        }
      }
      if (shouldCopy) {
        builder.copyFile(sourcePath);
      } else {
        // If not copying, we want to keep the file in the target
        // at the end of this loop, all files in targetExistingPaths will be marked for deletion, so remove this file
        targetExistingPaths.remove(newPath);
        desiredTargetExistingPaths.remove(newPath);
      }
    }

    multiTimer.nextStage(Stages.COMPUTE_DELETE_PATHS);
    // At this point, targetExistingPaths contains paths managed by target partition / table, but that we don't want
    // delete them
    for (Path delete : targetExistingPaths.keySet()) {
      builder.deleteFile(delete);
      desiredTargetExistingPaths.remove(delete);
    }

    // Now desiredTargetExistingPaths contains paths that we don't want, but which are not managed by the existing
    // table / partition.
    // Ideally, we shouldn't delete them (they're not managed by Hive), and we don't want to pick
    // them up in the new table / partition, so if there are any leftover files, we should abort copying
    // this table / partition.
    if (desiredTargetExistingPaths.size() > 0 && helper.getUnmanagedDataPolicy() != UnmanagedDataPolicy.DELETE_UNMANAGED_DATA) {
      throw new IOException(String.format(
          "New table / partition would pick up existing, undesired files in target file system. " + "%s, files %s.",
          partition.isPresent() ? partition.get().getCompleteName() : helper.getDataset().getTable().getCompleteName(),
          Arrays.toString(desiredTargetExistingPaths.keySet().toArray())));
    }
    // Unless, the policy requires us to delete such un-managed files - in which case: we will add the leftover files
    // to the deletion list.
    else if (desiredTargetExistingPaths.size() > 0) {
      for (Path delete : desiredTargetExistingPaths.keySet()) {
        builder.deleteFile(delete);
      }
      log.warn(String.format("Un-managed files detected in target file system, however deleting them "
              + "because of the policy: %s Files to be deleted are: %s", UnmanagedDataPolicy.DELETE_UNMANAGED_DATA,
          StringUtils.join(desiredTargetExistingPaths.keySet(), ",")));
    }

    return builder.build();
  }