private static DirDiff getDirDiff()

in samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java [268:353]


  private static DirDiff getDirDiff(File localSnapshotDir, DirIndex remoteSnapshotDir,
      BiPredicate<File, FileIndex> areSameFile, boolean isRootDir) {
    Preconditions.checkState(localSnapshotDir != null && localSnapshotDir.isDirectory());
    Preconditions.checkNotNull(remoteSnapshotDir);

    LOG.debug("Creating DirDiff between local dir: {} and remote dir: {}",
        localSnapshotDir.getPath(), remoteSnapshotDir.getDirName());
    List<DirDiff> subDirsAdded = new ArrayList<>();
    List<DirDiff> subDirsRetained = new ArrayList<>();
    List<DirIndex> subDirsRemoved = new ArrayList<>();
    List<File> filesToUpload = new ArrayList<>();
    List<FileIndex> filesToRetain = new ArrayList<>();
    List<FileIndex> filesToRemove = new ArrayList<>();

    // list files returns empty list if local snapshot directory is empty
    List<File> localSnapshotFiles = Arrays.asList(Objects.requireNonNull(localSnapshotDir.listFiles(File::isFile)));
    List<FileIndex> remoteSnapshotFiles = remoteSnapshotDir.getFilesPresent();

    // list files returns empty list if local snapshot directory is empty
    List<File> localSnapshotSubDirs = Arrays.asList(Objects.requireNonNull(localSnapshotDir.listFiles(File::isDirectory)));
    Set<String> localSnapshotSubDirNames = localSnapshotSubDirs.stream()
        .map(File::getName)
        .collect(Collectors.toCollection(HashSet::new));

    List<DirIndex> remoteSnapshotSubDirs = remoteSnapshotDir.getSubDirsPresent();
    Set<String> remoteSnapshotSubDirNames = remoteSnapshotSubDirs.stream()
        .map(DirIndex::getDirName)
        .collect(Collectors.toCollection(HashSet::new));

    Map<String, FileIndex> remoteFiles = remoteSnapshotFiles.stream()
        .collect(Collectors.toMap(FileIndex::getFileName, Function.identity()));

    Map<String, File> localFiles = localSnapshotFiles.stream()
        .collect(Collectors.toMap(File::getName, Function.identity()));

    for (String file : Sets.union(remoteFiles.keySet(), localFiles.keySet())) {
      if (localFiles.containsKey(file)) {
        if (remoteFiles.containsKey(file)) {
          if (areSameFile.test(localFiles.get(file), remoteFiles.get(file))) {
            // Files are the same locally and remotely, Retain
            filesToRetain.add(remoteFiles.get(file));
          } else {
            // Files are not the same, remove and upload
            filesToRemove.add(remoteFiles.get(file));
            filesToUpload.add(localFiles.get(file));
          }
        } else {
          // File exists locally, but not remotely, Upload
          filesToUpload.add(localFiles.get(file));
        }
      } else if (remoteFiles.containsKey(file)) {
        // File exists remotely, but not locally,  Remove
        filesToRemove.add(remoteFiles.get(file));
      }
    }

    for (File localSnapshotSubDir: localSnapshotSubDirs) {
      if (!remoteSnapshotSubDirNames.contains(localSnapshotSubDir.getName())) {
        LOG.debug("Subdir {} present in local snapshot but not in remote snapshot. " +
            "Recursively adding subdir contents.", localSnapshotSubDir.getPath());
        subDirsAdded.add(getDiffForNewDir(localSnapshotSubDir));
      } else {
        LOG.debug("Subdir {} present in local snapshot and in remote snapshot. " +
            "Recursively comparing local and remote subdirs.", localSnapshotSubDir.getPath());
        DirIndex remoteSubDirIndex =
            remoteSnapshotSubDirs.stream()
                .filter(indexBlob -> indexBlob.getDirName().equals(localSnapshotSubDir.getName()))
                .findFirst().get();
        subDirsRetained.add(getDirDiff(localSnapshotSubDir, remoteSubDirIndex, areSameFile, false));
      }
    }

    // 3. Subdir in remote snapshot but not in local snapshot
    for (DirIndex remoteSnapshotSubDir: remoteSnapshotSubDirs) {
      if (!localSnapshotSubDirNames.contains(remoteSnapshotSubDir.getDirName())) {
        LOG.debug("Subdir {} present in remote snapshot but not in local snapshot. " +
            "Marking for removal from remote snapshot. ", remoteSnapshotDir.getDirName());
        subDirsRemoved.add(remoteSnapshotSubDir);
      }
    }

    String dirName = isRootDir ? DirIndex.ROOT_DIR_NAME : localSnapshotDir.getName();
    return new DirDiff(dirName,
        filesToUpload, filesToRetain, filesToRemove,
        subDirsAdded, subDirsRetained, subDirsRemoved);
  }