in samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java [156:253]
public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileChecksums, boolean compareFileOwners) {
// Cache owner/group names to reduce calls to sun.nio.fs.UnixFileAttributes.group
Cache<String, String> groupCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
// Cache owner/group names to reduce calls to sun.nio.fs.UnixFileAttributes.owner
Cache<String, String> ownerCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
return (localFile, remoteFile) -> {
if (localFile.getName().equals(remoteFile.getFileName())) {
FileMetadata remoteFileMetadata = remoteFile.getFileMetadata();
boolean areSameFiles;
PosixFileAttributes localFileAttrs;
try {
localFileAttrs = Files.readAttributes(localFile.toPath(), PosixFileAttributes.class);
// Don't compare file timestamps. The ctime of a local file just restored will be different than the
// remote file, and will cause the file to be uploaded again during the first commit after restore.
areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize();
// In case a job is moved to a new cluster/machine, the owners (gid/uid) may be different than the one present
// in the remote snapshot. This flag indicates if we should compare it at all.
if (compareFileOwners) {
LOG.trace("Comparing owners of remote and local copy of file {}", localFile.getAbsolutePath());
areSameFiles =
areSameFiles && groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), "unix:gid")),
() -> localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) && ownerCache.get(
String.valueOf(Files.getAttribute(localFile.toPath(), "unix:uid")),
() -> localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner());
}
} catch (IOException | ExecutionException e) {
LOG.error("Error reading attributes for file: {}", localFile.getAbsolutePath());
throw new RuntimeException(String.format("Error reading attributes for file: %s", localFile.getAbsolutePath()));
}
Set<PosixFilePermission> remoteFilePermissions =
PosixFilePermissions.fromString(remoteFileMetadata.getPermissions());
// only verify permissions for file owner
areSameFiles = areSameFiles &&
localFileAttrs.permissions().contains(PosixFilePermission.OWNER_READ) ==
remoteFilePermissions.contains(PosixFilePermission.OWNER_READ) &&
localFileAttrs.permissions().contains(PosixFilePermission.OWNER_WRITE) ==
remoteFilePermissions.contains(PosixFilePermission.OWNER_WRITE) &&
localFileAttrs.permissions().contains(PosixFilePermission.OWNER_EXECUTE) ==
remoteFilePermissions.contains(PosixFilePermission.OWNER_EXECUTE);
if (!areSameFiles) {
if (LOG.isWarnEnabled()) {
LOG.warn("Local file: {} and remote file: {} are not same. "
+ "Local file attributes: {}. Remote file attributes: {}.", localFile.getAbsolutePath(), remoteFile.getFileName(),
fileAttributesToString(localFileAttrs), remoteFile.getFileMetadata().toString());
}
return false;
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Local file: {}. Remote file: {}. " + "Local file attributes: {}. Remote file attributes: {}.",
localFile.getAbsolutePath(), remoteFile.getFileName(), fileAttributesToString(localFileAttrs),
remoteFile.getFileMetadata().toString());
}
}
boolean isLargeFile = localFileAttrs.size() > 1024 * 1024;
if (!compareLargeFileChecksums && isLargeFile) {
// Since RocksDB SST files are immutable after creation, we can skip the expensive checksum computations
// which requires reading the entire file.
LOG.debug("Local file: {} and remote file: {} both present. " +
"Skipping checksum calculation for large file of size: {}.",
localFile.getAbsolutePath(), remoteFile.getFileName(), localFileAttrs.size());
return true;
} else {
try {
FileInputStream fis = new FileInputStream(localFile);
CheckedInputStream cis = new CheckedInputStream(fis, new CRC32());
byte[] buffer = new byte[8 * 1024]; // 8 KB
while (cis.read(buffer, 0, buffer.length) >= 0) { }
long localFileChecksum = cis.getChecksum().getValue();
cis.close();
boolean areSameChecksum = localFileChecksum == remoteFile.getChecksum();
if (!areSameChecksum) {
LOG.debug("Local file: {} and remote file: {} are not same. " +
"Local checksum: {}. Remote checksum: {}",
localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum());
} else {
LOG.debug("Local file: {} and remote file: {} are same. Local checksum: {}. Remote checksum: {}",
localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum());
}
return areSameChecksum;
} catch (IOException e) {
throw new SamzaException("Error calculating checksum for local file: " + localFile.getAbsolutePath(), e);
}
}
}
return false;
};
}