public static BiPredicate areSameFile()

in samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java [156:253]


  public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileChecksums, boolean compareFileOwners) {

    // Cache owner/group names to reduce calls to sun.nio.fs.UnixFileAttributes.group
    Cache<String, String> groupCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
    // Cache owner/group names to reduce calls to sun.nio.fs.UnixFileAttributes.owner
    Cache<String, String> ownerCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();

    return (localFile, remoteFile) -> {
      if (localFile.getName().equals(remoteFile.getFileName())) {
        FileMetadata remoteFileMetadata = remoteFile.getFileMetadata();

        boolean areSameFiles;
        PosixFileAttributes localFileAttrs;
        try {
          localFileAttrs = Files.readAttributes(localFile.toPath(), PosixFileAttributes.class);

          // Don't compare file timestamps. The ctime of a local file just restored will be different than the
          // remote file, and will cause the file to be uploaded again during the first commit after restore.
          areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize();
          // In case a job is moved to a new cluster/machine, the owners (gid/uid) may be different than the one present
          // in the remote snapshot. This flag indicates if we should compare it at all.
          if (compareFileOwners) {
            LOG.trace("Comparing owners of remote and local copy of file {}", localFile.getAbsolutePath());
            areSameFiles =
                areSameFiles && groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), "unix:gid")),
                  () -> localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) && ownerCache.get(
                       String.valueOf(Files.getAttribute(localFile.toPath(), "unix:uid")),
                    () -> localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner());
          }

        } catch (IOException | ExecutionException e) {
          LOG.error("Error reading attributes for file: {}", localFile.getAbsolutePath());
          throw new RuntimeException(String.format("Error reading attributes for file: %s", localFile.getAbsolutePath()));
        }

        Set<PosixFilePermission> remoteFilePermissions =
            PosixFilePermissions.fromString(remoteFileMetadata.getPermissions());

        // only verify permissions for file owner
        areSameFiles = areSameFiles &&
            localFileAttrs.permissions().contains(PosixFilePermission.OWNER_READ) ==
                remoteFilePermissions.contains(PosixFilePermission.OWNER_READ) &&
            localFileAttrs.permissions().contains(PosixFilePermission.OWNER_WRITE) ==
                remoteFilePermissions.contains(PosixFilePermission.OWNER_WRITE) &&
            localFileAttrs.permissions().contains(PosixFilePermission.OWNER_EXECUTE) ==
                remoteFilePermissions.contains(PosixFilePermission.OWNER_EXECUTE);

        if (!areSameFiles) {
          if (LOG.isWarnEnabled()) {
            LOG.warn("Local file: {} and remote file: {} are not same. "
                    + "Local file attributes: {}. Remote file attributes: {}.", localFile.getAbsolutePath(), remoteFile.getFileName(),
                fileAttributesToString(localFileAttrs), remoteFile.getFileMetadata().toString());
          }
          return false;
        } else {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Local file: {}. Remote file: {}. " + "Local file attributes: {}. Remote file attributes: {}.",
                localFile.getAbsolutePath(), remoteFile.getFileName(), fileAttributesToString(localFileAttrs),
                remoteFile.getFileMetadata().toString());
          }
        }

        boolean isLargeFile = localFileAttrs.size() > 1024 * 1024;
        if (!compareLargeFileChecksums && isLargeFile) {
          // Since RocksDB SST files are immutable after creation, we can skip the expensive checksum computations
          // which requires reading the entire file.
          LOG.debug("Local file: {} and remote file: {} both present. " +
                  "Skipping checksum calculation for large file of size: {}.",
              localFile.getAbsolutePath(), remoteFile.getFileName(), localFileAttrs.size());
          return true;
        } else {
          try {
            FileInputStream fis = new FileInputStream(localFile);
            CheckedInputStream cis = new CheckedInputStream(fis, new CRC32());
            byte[] buffer = new byte[8 * 1024]; // 8 KB
            while (cis.read(buffer, 0, buffer.length) >= 0) { }
            long localFileChecksum = cis.getChecksum().getValue();
            cis.close();

            boolean areSameChecksum = localFileChecksum == remoteFile.getChecksum();
            if (!areSameChecksum) {
              LOG.debug("Local file: {} and remote file: {} are not same. " +
                      "Local checksum: {}. Remote checksum: {}",
                  localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum());
            } else {
              LOG.debug("Local file: {} and remote file: {} are same. Local checksum: {}. Remote checksum: {}",
                  localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum());
            }
            return areSameChecksum;
          } catch (IOException e) {
            throw new SamzaException("Error calculating checksum for local file: " + localFile.getAbsolutePath(), e);
          }
        }
      }

      return false;
    };
  }