in priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java [106:196]
protected long uploadFileImpl(AbstractBackupPath path, Instant target)
throws BackupRestoreException {
Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
String remotePath = path.getRemotePath();
long chunkSize = getChunkSize(localPath);
// initialize chunking request to aws
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(config.getBackupPrefix(), remotePath);
// Fetch the aws generated upload id for this chunking request
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
DataPart part =
new DataPart(config.getBackupPrefix(), remotePath, initResponse.getUploadId());
// Metadata on number of parts to be uploaded
List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());
// Read chunks from src, compress it, and write to temp file
File compressedDstFile = new File(localPath.toString() + ".compressed");
if (logger.isDebugEnabled())
logger.debug(
"Compressing {} with chunk size {}",
compressedDstFile.getAbsolutePath(),
chunkSize);
try (InputStream in = new FileInputStream(localPath.toFile());
BufferedOutputStream compressedBos =
new BufferedOutputStream(new FileOutputStream(compressedDstFile))) {
Iterator<byte[]> compressedChunks =
new ChunkedStream(in, chunkSize, path.getCompression());
while (compressedChunks.hasNext()) {
byte[] compressedChunk = compressedChunks.next();
compressedBos.write(compressedChunk);
}
} catch (Exception e) {
String message =
"Exception in compressing the input data during upload to EncryptedStore Msg: "
+ e.getMessage();
logger.error(message, e);
throw new BackupRestoreException(message);
}
// == Read compressed data, encrypt each chunk, upload it to aws
try (BufferedInputStream compressedBis =
new BufferedInputStream(new FileInputStream(compressedDstFile))) {
Iterator<byte[]> chunks = this.encryptor.encryptStream(compressedBis, remotePath);
// identifies this part position in the object we are uploading
int partNum = 0;
long encryptedFileSize = 0;
while (chunks.hasNext()) {
byte[] chunk = chunks.next();
// throttle upload to endpoint
rateLimiter.acquire(chunk.length);
dynamicRateLimiter.acquire(path, target, chunk.length);
DataPart dp =
new DataPart(
++partNum,
chunk,
config.getBackupPrefix(),
remotePath,
initResponse.getUploadId());
S3PartUploader partUploader = new S3PartUploader(s3Client, dp, partETags);
encryptedFileSize += chunk.length;
executor.submit(partUploader);
}
executor.sleepTillEmpty();
if (partNum != partETags.size()) {
throw new BackupRestoreException(
"Number of parts("
+ partNum
+ ") does not match the expected number of uploaded parts("
+ partETags.size()
+ ")");
}
// complete the aws chunking upload by providing to aws the ETag that uniquely
// identifies the combined object datav
CompleteMultipartUploadResult resultS3MultiPartUploadComplete =
new S3PartUploader(s3Client, part, partETags).completeUpload();
checkSuccessfulUpload(resultS3MultiPartUploadComplete, localPath);
return encryptedFileSize;
} catch (Exception e) {
new S3PartUploader(s3Client, part, partETags).abortUpload();
throw new BackupRestoreException("Error uploading file: " + localPath, e);
} finally {
if (compressedDstFile.exists()) compressedDstFile.delete();
}
}