public CompletableFuture putFile()

in samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java [522:591]


  public CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) {
    if (file == null || !file.isFile()) {
      String message = file != null ? "Dir or Symbolic link" : "null";
      throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message));
    }
    long putFileStartTime = System.nanoTime();

    String opName = "putFile: " + file.getAbsolutePath();
    Supplier<CompletionStage<FileIndex>> fileUploadAction = () -> {
      LOG.debug("Putting file: {} to blob store.", file.getPath());
      CompletableFuture<FileIndex> fileBlobFuture;
      CheckedInputStream inputStream = null;
      try {
        // TODO HIGH shesharm maybe use the more efficient CRC32C / PureJavaCRC32 impl
        inputStream = new CheckedInputStream(new FileInputStream(file), new CRC32());
        CheckedInputStream finalInputStream = inputStream;
        FileMetadata fileMetadata = FileMetadata.fromFile(file);
        if (backupMetrics != null) {
          backupMetrics.avgFileSizeBytes.update(fileMetadata.getSize());
        }

        Metadata metadata =
            new Metadata(file.getAbsolutePath(), Optional.of(fileMetadata.getSize()), snapshotMetadata.getJobName(),
                snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName());

        fileBlobFuture = blobStoreManager.put(inputStream, metadata)
            .thenApplyAsync(id -> {
              LOG.trace("Put complete. Received Blob ID {}. Closing input stream for file: {}.", id, file.getPath());
              try {
                finalInputStream.close();
              } catch (Exception e) {
                throw new SamzaException(String.format("Error closing input stream for file: %s",
                    file.getAbsolutePath()), e);
              }

              LOG.trace("Returning new FileIndex for file: {}.", file.getPath());
              return new FileIndex(
                  file.getName(),
                  Collections.singletonList(new FileBlob(id, 0)),
                  fileMetadata,
                  finalInputStream.getChecksum().getValue());
            }, executor).toCompletableFuture();
      } catch (Exception e) {
        try {
          if (inputStream != null) {
            inputStream.close();
          }
        } catch (Exception err) {
          LOG.error("Error closing input stream for file: {}", file.getName(), err);
        }
        LOG.error("Error putting file: {}", file.getName(), e);
        throw new SamzaException(String.format("Error putting file %s", file.getAbsolutePath()), e);
      }
      return fileBlobFuture;
    };

    return FutureUtil.executeAsyncWithRetries(opName, fileUploadAction, isCauseNonRetriable(), executor, retryPolicyConfig)
        .whenComplete((res, ex) -> {
          if (backupMetrics != null) {
            backupMetrics.avgFileUploadNs.update(System.nanoTime() - putFileStartTime);

            long fileSize = file.length();
            backupMetrics.uploadRate.inc(fileSize);
            backupMetrics.filesUploaded.getValue().addAndGet(1);
            backupMetrics.bytesUploaded.getValue().addAndGet(fileSize);
            backupMetrics.filesRemaining.getValue().addAndGet(-1);
            backupMetrics.bytesRemaining.getValue().addAndGet(-1 * fileSize);
          }
        });
  }