protected long uploadFileImpl()

in priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java [106:196]


    protected long uploadFileImpl(AbstractBackupPath path, Instant target)
            throws BackupRestoreException {
        Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
        String remotePath = path.getRemotePath();

        long chunkSize = getChunkSize(localPath);
        // initialize chunking request to aws
        InitiateMultipartUploadRequest initRequest =
                new InitiateMultipartUploadRequest(config.getBackupPrefix(), remotePath);
        // Fetch the aws generated upload id for this chunking request
        InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
        DataPart part =
                new DataPart(config.getBackupPrefix(), remotePath, initResponse.getUploadId());
        // Metadata on number of parts to be uploaded
        List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());

        // Read chunks from src, compress it, and write to temp file
        File compressedDstFile = new File(localPath.toString() + ".compressed");
        if (logger.isDebugEnabled())
            logger.debug(
                    "Compressing {} with chunk size {}",
                    compressedDstFile.getAbsolutePath(),
                    chunkSize);

        try (InputStream in = new FileInputStream(localPath.toFile());
                BufferedOutputStream compressedBos =
                        new BufferedOutputStream(new FileOutputStream(compressedDstFile))) {
            Iterator<byte[]> compressedChunks =
                    new ChunkedStream(in, chunkSize, path.getCompression());
            while (compressedChunks.hasNext()) {
                byte[] compressedChunk = compressedChunks.next();
                compressedBos.write(compressedChunk);
            }
        } catch (Exception e) {
            String message =
                    "Exception in compressing the input data during upload to EncryptedStore  Msg: "
                            + e.getMessage();
            logger.error(message, e);
            throw new BackupRestoreException(message);
        }

        // == Read compressed data, encrypt each chunk, upload it to aws
        try (BufferedInputStream compressedBis =
                new BufferedInputStream(new FileInputStream(compressedDstFile))) {
            Iterator<byte[]> chunks = this.encryptor.encryptStream(compressedBis, remotePath);

            // identifies this part position in the object we are uploading
            int partNum = 0;
            long encryptedFileSize = 0;

            while (chunks.hasNext()) {
                byte[] chunk = chunks.next();
                // throttle upload to endpoint
                rateLimiter.acquire(chunk.length);
                dynamicRateLimiter.acquire(path, target, chunk.length);

                DataPart dp =
                        new DataPart(
                                ++partNum,
                                chunk,
                                config.getBackupPrefix(),
                                remotePath,
                                initResponse.getUploadId());
                S3PartUploader partUploader = new S3PartUploader(s3Client, dp, partETags);
                encryptedFileSize += chunk.length;
                executor.submit(partUploader);
            }

            executor.sleepTillEmpty();
            if (partNum != partETags.size()) {
                throw new BackupRestoreException(
                        "Number of parts("
                                + partNum
                                + ")  does not match the expected number of uploaded parts("
                                + partETags.size()
                                + ")");
            }

            // complete the aws chunking upload by providing to aws the ETag that uniquely
            // identifies the combined object datav
            CompleteMultipartUploadResult resultS3MultiPartUploadComplete =
                    new S3PartUploader(s3Client, part, partETags).completeUpload();
            checkSuccessfulUpload(resultS3MultiPartUploadComplete, localPath);
            return encryptedFileSize;
        } catch (Exception e) {
            new S3PartUploader(s3Client, part, partETags).abortUpload();
            throw new BackupRestoreException("Error uploading file: " + localPath, e);
        } finally {
            if (compressedDstFile.exists()) compressedDstFile.delete();
        }
    }