in samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java [268:353]
private static DirDiff getDirDiff(File localSnapshotDir, DirIndex remoteSnapshotDir,
BiPredicate<File, FileIndex> areSameFile, boolean isRootDir) {
Preconditions.checkState(localSnapshotDir != null && localSnapshotDir.isDirectory());
Preconditions.checkNotNull(remoteSnapshotDir);
LOG.debug("Creating DirDiff between local dir: {} and remote dir: {}",
localSnapshotDir.getPath(), remoteSnapshotDir.getDirName());
List<DirDiff> subDirsAdded = new ArrayList<>();
List<DirDiff> subDirsRetained = new ArrayList<>();
List<DirIndex> subDirsRemoved = new ArrayList<>();
List<File> filesToUpload = new ArrayList<>();
List<FileIndex> filesToRetain = new ArrayList<>();
List<FileIndex> filesToRemove = new ArrayList<>();
// list files returns empty list if local snapshot directory is empty
List<File> localSnapshotFiles = Arrays.asList(Objects.requireNonNull(localSnapshotDir.listFiles(File::isFile)));
List<FileIndex> remoteSnapshotFiles = remoteSnapshotDir.getFilesPresent();
// list files returns empty list if local snapshot directory is empty
List<File> localSnapshotSubDirs = Arrays.asList(Objects.requireNonNull(localSnapshotDir.listFiles(File::isDirectory)));
Set<String> localSnapshotSubDirNames = localSnapshotSubDirs.stream()
.map(File::getName)
.collect(Collectors.toCollection(HashSet::new));
List<DirIndex> remoteSnapshotSubDirs = remoteSnapshotDir.getSubDirsPresent();
Set<String> remoteSnapshotSubDirNames = remoteSnapshotSubDirs.stream()
.map(DirIndex::getDirName)
.collect(Collectors.toCollection(HashSet::new));
Map<String, FileIndex> remoteFiles = remoteSnapshotFiles.stream()
.collect(Collectors.toMap(FileIndex::getFileName, Function.identity()));
Map<String, File> localFiles = localSnapshotFiles.stream()
.collect(Collectors.toMap(File::getName, Function.identity()));
for (String file : Sets.union(remoteFiles.keySet(), localFiles.keySet())) {
if (localFiles.containsKey(file)) {
if (remoteFiles.containsKey(file)) {
if (areSameFile.test(localFiles.get(file), remoteFiles.get(file))) {
// Files are the same locally and remotely, Retain
filesToRetain.add(remoteFiles.get(file));
} else {
// Files are not the same, remove and upload
filesToRemove.add(remoteFiles.get(file));
filesToUpload.add(localFiles.get(file));
}
} else {
// File exists locally, but not remotely, Upload
filesToUpload.add(localFiles.get(file));
}
} else if (remoteFiles.containsKey(file)) {
// File exists remotely, but not locally, Remove
filesToRemove.add(remoteFiles.get(file));
}
}
for (File localSnapshotSubDir: localSnapshotSubDirs) {
if (!remoteSnapshotSubDirNames.contains(localSnapshotSubDir.getName())) {
LOG.debug("Subdir {} present in local snapshot but not in remote snapshot. " +
"Recursively adding subdir contents.", localSnapshotSubDir.getPath());
subDirsAdded.add(getDiffForNewDir(localSnapshotSubDir));
} else {
LOG.debug("Subdir {} present in local snapshot and in remote snapshot. " +
"Recursively comparing local and remote subdirs.", localSnapshotSubDir.getPath());
DirIndex remoteSubDirIndex =
remoteSnapshotSubDirs.stream()
.filter(indexBlob -> indexBlob.getDirName().equals(localSnapshotSubDir.getName()))
.findFirst().get();
subDirsRetained.add(getDirDiff(localSnapshotSubDir, remoteSubDirIndex, areSameFile, false));
}
}
// 3. Subdir in remote snapshot but not in local snapshot
for (DirIndex remoteSnapshotSubDir: remoteSnapshotSubDirs) {
if (!localSnapshotSubDirNames.contains(remoteSnapshotSubDir.getDirName())) {
LOG.debug("Subdir {} present in remote snapshot but not in local snapshot. " +
"Marking for removal from remote snapshot. ", remoteSnapshotDir.getDirName());
subDirsRemoved.add(remoteSnapshotSubDir);
}
}
String dirName = isRootDir ? DirIndex.ROOT_DIR_NAME : localSnapshotDir.getName();
return new DirDiff(dirName,
filesToUpload, filesToRetain, filesToRemove,
subDirsAdded, subDirsRetained, subDirsRemoved);
}