in samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java [522:591]
public CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) {
if (file == null || !file.isFile()) {
String message = file != null ? "Dir or Symbolic link" : "null";
throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message));
}
long putFileStartTime = System.nanoTime();
String opName = "putFile: " + file.getAbsolutePath();
Supplier<CompletionStage<FileIndex>> fileUploadAction = () -> {
LOG.debug("Putting file: {} to blob store.", file.getPath());
CompletableFuture<FileIndex> fileBlobFuture;
CheckedInputStream inputStream = null;
try {
// TODO HIGH shesharm maybe use the more efficient CRC32C / PureJavaCRC32 impl
inputStream = new CheckedInputStream(new FileInputStream(file), new CRC32());
CheckedInputStream finalInputStream = inputStream;
FileMetadata fileMetadata = FileMetadata.fromFile(file);
if (backupMetrics != null) {
backupMetrics.avgFileSizeBytes.update(fileMetadata.getSize());
}
Metadata metadata =
new Metadata(file.getAbsolutePath(), Optional.of(fileMetadata.getSize()), snapshotMetadata.getJobName(),
snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName());
fileBlobFuture = blobStoreManager.put(inputStream, metadata)
.thenApplyAsync(id -> {
LOG.trace("Put complete. Received Blob ID {}. Closing input stream for file: {}.", id, file.getPath());
try {
finalInputStream.close();
} catch (Exception e) {
throw new SamzaException(String.format("Error closing input stream for file: %s",
file.getAbsolutePath()), e);
}
LOG.trace("Returning new FileIndex for file: {}.", file.getPath());
return new FileIndex(
file.getName(),
Collections.singletonList(new FileBlob(id, 0)),
fileMetadata,
finalInputStream.getChecksum().getValue());
}, executor).toCompletableFuture();
} catch (Exception e) {
try {
if (inputStream != null) {
inputStream.close();
}
} catch (Exception err) {
LOG.error("Error closing input stream for file: {}", file.getName(), err);
}
LOG.error("Error putting file: {}", file.getName(), e);
throw new SamzaException(String.format("Error putting file %s", file.getAbsolutePath()), e);
}
return fileBlobFuture;
};
return FutureUtil.executeAsyncWithRetries(opName, fileUploadAction, isCauseNonRetriable(), executor, retryPolicyConfig)
.whenComplete((res, ex) -> {
if (backupMetrics != null) {
backupMetrics.avgFileUploadNs.update(System.nanoTime() - putFileStartTime);
long fileSize = file.length();
backupMetrics.uploadRate.inc(fileSize);
backupMetrics.filesUploaded.getValue().addAndGet(1);
backupMetrics.bytesUploaded.getValue().addAndGet(fileSize);
backupMetrics.filesRemaining.getValue().addAndGet(-1);
backupMetrics.bytesRemaining.getValue().addAndGet(-1 * fileSize);
}
});
}