in priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java [122:172]
private long uploadMultipart(AbstractBackupPath path, Instant target)
throws BackupRestoreException {
Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
String remotePath = path.getRemotePath();
long chunkSize = getChunkSize(localPath);
String prefix = config.getBackupPrefix();
if (logger.isDebugEnabled())
logger.debug("Uploading to {}/{} with chunk size {}", prefix, remotePath, chunkSize);
File localFile = localPath.toFile();
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(prefix, remotePath)
.withObjectMetadata(getObjectMetadata(localFile));
String uploadId = s3Client.initiateMultipartUpload(initRequest).getUploadId();
DataPart part = new DataPart(prefix, remotePath, uploadId);
List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());
try (InputStream in = new FileInputStream(localFile)) {
Iterator<byte[]> chunks = new ChunkedStream(in, chunkSize, path.getCompression());
int partNum = 0;
AtomicInteger partsPut = new AtomicInteger(0);
long compressedFileSize = 0;
while (chunks.hasNext()) {
byte[] chunk = chunks.next();
rateLimiter.acquire(chunk.length);
dynamicRateLimiter.acquire(path, target, chunk.length);
DataPart dp = new DataPart(++partNum, chunk, prefix, remotePath, uploadId);
S3PartUploader partUploader = new S3PartUploader(s3Client, dp, partETags, partsPut);
compressedFileSize += chunk.length;
// TODO: output Future<Etag> instead, collect them here, wait for all below
executor.submit(partUploader);
}
executor.sleepTillEmpty();
logger.info("{} done. part count: {} expected: {}", localFile, partsPut.get(), partNum);
Preconditions.checkState(partNum == partETags.size(), "part count mismatch");
CompleteMultipartUploadResult resultS3MultiPartUploadComplete =
new S3PartUploader(s3Client, part, partETags).completeUpload();
checkSuccessfulUpload(resultS3MultiPartUploadComplete, localPath);
if (logger.isDebugEnabled()) {
final S3ResponseMetadata info = s3Client.getCachedResponseMetadata(initRequest);
logger.debug("Request Id: {}, Host Id: {}", info.getRequestId(), info.getHostId());
}
return compressedFileSize;
} catch (Exception e) {
new S3PartUploader(s3Client, part, partETags).abortUpload();
throw new BackupRestoreException("Error uploading file: " + localPath.toString(), e);
}
}