public Collection getCopyableFiles()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java [176:322]


  public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, CopyConfiguration copyConfiguration)
      throws IOException {
    boolean enforceFileSizeMatch = this.rc.getEnforceFileSizeMatchFromConfigStore().isPresent()?
        this.rc.getEnforceFileSizeMatchFromConfigStore().get() :
        copyConfiguration.isEnforceFileLengthMatch();

    List<CopyEntity> copyableFiles = Lists.newArrayList();
    EndPoint copyFromRaw = copyRoute.getCopyFrom();
    EndPoint copyToRaw = copyRoute.getCopyTo();
    if (!(copyFromRaw instanceof HadoopFsEndPoint && copyToRaw instanceof HadoopFsEndPoint)) {
      log.warn("Currently only handle the Hadoop Fs EndPoint replication");
      return copyableFiles;
    }

    if (!this.srcDataFileVersionStrategy.isPresent() || !this.dstDataFileVersionStrategy.isPresent()) {
      log.warn("Version strategy doesn't exist, cannot handle copy");
      return copyableFiles;
    }

    if (!this.srcDataFileVersionStrategy.get().getClass().getName()
        .equals(this.dstDataFileVersionStrategy.get().getClass().getName())) {
      log.warn("Version strategy src: {} and dst: {} doesn't match, cannot handle copy.",
          this.srcDataFileVersionStrategy.get().getClass().getName(),
          this.dstDataFileVersionStrategy.get().getClass().getName());
      return copyableFiles;
    }

    //For {@link HadoopFsEndPoint}s, set pathfilter and applyFilterToDirectories
    HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw;
    HadoopFsEndPoint copyTo = (HadoopFsEndPoint) copyToRaw;
    copyFrom.setPathFilter(pathFilter);
    copyFrom.setApplyFilterToDirectories(applyFilterToDirectories);
    copyTo.setPathFilter(pathFilter);
    copyTo.setApplyFilterToDirectories(applyFilterToDirectories);

    if (this.watermarkEnabled) {
      if ((!copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()) || (
          copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()
              && copyFromRaw.getWatermark().get().compareTo(copyToRaw.getWatermark().get()) <= 0)) {
        log.info(
            "No need to copy as destination watermark >= source watermark with source watermark {}, for dataset with metadata {}",
            copyFromRaw.getWatermark().isPresent() ? copyFromRaw.getWatermark().get().toJson() : "N/A",
            this.rc.getMetaData());
        return copyableFiles;
      }
    }

    Configuration conf = HadoopUtils.newConfiguration();
    FileSystem copyFromFs = FileSystem.get(copyFrom.getFsURI(), conf);
    FileSystem copyToFs = FileSystem.get(copyTo.getFsURI(), conf);

    Collection<FileStatus> allFilesInSource = copyFrom.getFiles();
    Collection<FileStatus> allFilesInTarget = copyTo.getFiles();

    Set<FileStatus> copyFromFileStatuses = Sets.newHashSet(allFilesInSource);
    Map<Path, FileStatus> copyToFileMap = Maps.newHashMap();
    for (FileStatus f : allFilesInTarget) {
      copyToFileMap.put(PathUtils.getPathWithoutSchemeAndAuthority(f.getPath()), f);
    }

    Collection<Path> deletedPaths = Lists.newArrayList();

    boolean watermarkMetadataCopied = false;

    boolean deleteTargetIfNotExistOnSource = rc.isDeleteTargetIfNotExistOnSource();

    for (FileStatus originFileStatus : copyFromFileStatuses) {
      Path relative = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(originFileStatus.getPath()),
          PathUtils.getPathWithoutSchemeAndAuthority(copyFrom.getDatasetPath()));
      // construct the new path in the target file system
      Path newPath = new Path(copyTo.getDatasetPath(), relative);

      if (relative.toString().equals(ReplicaHadoopFsEndPoint.WATERMARK_FILE)) {
        watermarkMetadataCopied = true;
      }


      boolean shouldCopy = true;
      // Can optimize by using the mod time that has already been fetched
      boolean useDirectGetModTime = this.srcDataFileVersionStrategy.isPresent()
          && this.srcDataFileVersionStrategy.get().getClass().getName().equals(
              ModTimeDataFileVersionStrategy.class.getName());

      if (copyToFileMap.containsKey(newPath)) {
        Comparable srcVer = useDirectGetModTime ? originFileStatus.getModificationTime() :
            this.srcDataFileVersionStrategy.get().getVersion(originFileStatus.getPath());
        Comparable dstVer = useDirectGetModTime ? copyToFileMap.get(newPath).getModificationTime() :
            this.dstDataFileVersionStrategy.get().getVersion(copyToFileMap.get(newPath).getPath());

        // destination has higher version, skip the copy
        if (srcVer.compareTo(dstVer) <= 0) {
          if (!enforceFileSizeMatch || copyToFileMap.get(newPath).getLen() == originFileStatus.getLen()) {
            log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can be skipped.",
                originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
            shouldCopy = false;
          } else {
            log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can not be skipped due to unmatched file length.",
                originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
          }
        } else {
          log.debug("Copy from src {} (v:{}) to dst {} (v:{}) is needed due to a higher version.",
              originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
        }
      } else {
        log.debug("Copy from src {} to dst {} is needed because dst doesn't contain the file",
            originFileStatus.getPath(), copyToFileMap.get(newPath));
      }

      if (shouldCopy) {
        // need to remove those files in the target File System
        if (copyToFileMap.containsKey(newPath)) {
          deletedPaths.add(newPath);
        }
        CopyableFile copyableFile = CopyableFile
            .fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), copyConfiguration)
            .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString())
            .dataFileVersionStrategy(this.versionStrategyFromCS.isPresent()? this.versionStrategyFromCS.get(): null)
            .build();
        copyableFile.setFsDatasets(copyFromFs, copyToFs);
        copyableFiles.add(copyableFile);
      }

      // clean up already checked paths
      copyToFileMap.remove(newPath);
    }

    // delete the paths on target directory if NOT exists on source
    if (deleteTargetIfNotExistOnSource) {
      deletedPaths.addAll(copyToFileMap.keySet());
    }

    // delete old files first
    if (!deletedPaths.isEmpty()) {
      DeleteFileCommitStep deleteCommitStep = DeleteFileCommitStep.fromPaths(copyToFs, deletedPaths, this.props);
      copyableFiles.add(
          new PrePublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String>newHashMap(), deleteCommitStep,
              0));
    }

    // generate the watermark file even if watermark checking is disabled. Make sure it can come into functional once disired.
    if ((!watermarkMetadataCopied) && copyFrom.getWatermark().isPresent()) {
      copyableFiles.add(new PostPublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String>newHashMap(),
          new WatermarkMetadataGenerationCommitStep(copyTo.getFsURI().toString(), copyTo.getDatasetPath(),
              copyFrom.getWatermark().get()), 1));
    }
    return copyableFiles;
  }