private long uploadMultipart()

in priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java [122:172]


    private long uploadMultipart(AbstractBackupPath path, Instant target)
            throws BackupRestoreException {
        Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
        String remotePath = path.getRemotePath();
        long chunkSize = getChunkSize(localPath);
        String prefix = config.getBackupPrefix();
        if (logger.isDebugEnabled())
            logger.debug("Uploading to {}/{} with chunk size {}", prefix, remotePath, chunkSize);
        File localFile = localPath.toFile();
        InitiateMultipartUploadRequest initRequest =
                new InitiateMultipartUploadRequest(prefix, remotePath)
                        .withObjectMetadata(getObjectMetadata(localFile));
        String uploadId = s3Client.initiateMultipartUpload(initRequest).getUploadId();
        DataPart part = new DataPart(prefix, remotePath, uploadId);
        List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());

        try (InputStream in = new FileInputStream(localFile)) {
            Iterator<byte[]> chunks = new ChunkedStream(in, chunkSize, path.getCompression());
            int partNum = 0;
            AtomicInteger partsPut = new AtomicInteger(0);
            long compressedFileSize = 0;

            while (chunks.hasNext()) {
                byte[] chunk = chunks.next();
                rateLimiter.acquire(chunk.length);
                dynamicRateLimiter.acquire(path, target, chunk.length);
                DataPart dp = new DataPart(++partNum, chunk, prefix, remotePath, uploadId);
                S3PartUploader partUploader = new S3PartUploader(s3Client, dp, partETags, partsPut);
                compressedFileSize += chunk.length;
                // TODO: output Future<Etag> instead, collect them here, wait for all below
                executor.submit(partUploader);
            }

            executor.sleepTillEmpty();
            logger.info("{} done. part count: {} expected: {}", localFile, partsPut.get(), partNum);
            Preconditions.checkState(partNum == partETags.size(), "part count mismatch");
            CompleteMultipartUploadResult resultS3MultiPartUploadComplete =
                    new S3PartUploader(s3Client, part, partETags).completeUpload();
            checkSuccessfulUpload(resultS3MultiPartUploadComplete, localPath);

            if (logger.isDebugEnabled()) {
                final S3ResponseMetadata info = s3Client.getCachedResponseMetadata(initRequest);
                logger.debug("Request Id: {}, Host Id: {}", info.getRequestId(), info.getHostId());
            }

            return compressedFileSize;
        } catch (Exception e) {
            new S3PartUploader(s3Client, part, partETags).abortUpload();
            throw new BackupRestoreException("Error uploading file: " + localPath.toString(), e);
        }
    }