in src/main/java/com/amazonaws/services/neptune/export/ExportToS3NeptuneExportEventHandler.java [334:403]
private void uploadExportFilesToS3(S3TransferManager transferManager, File directory, S3ObjectInfo outputS3ObjectInfo) {
if (directory == null || !directory.exists()) {
logger.error("Request to upload files to S3 failed because upload directory from which to upload files does not exist");
throw new RuntimeException("Failed to upload files to S3 because upload directory from which to upload files does not exist");
}
boolean allowRetry = true;
int retryCount = 0;
while (allowRetry){
try {
logger.info("Uploading export files to {}", outputS3ObjectInfo.toString());
UploadDirectoryRequest uploadRequest = UploadDirectoryRequest.builder()
.source(directory.toPath())
.bucket(outputS3ObjectInfo.bucket())
.s3Prefix(outputS3ObjectInfo.key())
.uploadFileRequestTransformer(builder -> {
UploadFileRequest built = builder.build();
PutObjectRequest.Builder newBuilder = built.putObjectRequest().toBuilder();
newBuilder = configureServerSideEncryption(newBuilder, sseKmsKeyId).tagging(createObjectTags(profiles));
builder.putObjectRequest(newBuilder.build());
})
.build();
try{
DirectoryUpload upload = transferManager.uploadDirectory(uploadRequest);
CompletedDirectoryUpload completedDirectoryUpload = upload.completionFuture().join();
List<FailedFileUpload> failedFileUploads = completedDirectoryUpload.failedTransfers();
RuntimeException fileUploadException = null;
for (FailedFileUpload failedFileUpload : failedFileUploads) {
logger.error("Failed S3 upload for file {}", failedFileUpload.request().source(), failedFileUpload.exception());
if (fileUploadException == null) {
fileUploadException = new RuntimeException(failedFileUpload.exception());
} else {
fileUploadException.addSuppressed(failedFileUpload.exception());
}
}
if (fileUploadException != null) {
throw fileUploadException;
}
} catch (CompletionException e) {
if (e.getCause() instanceof AmazonServiceException) {
AmazonClientException amazonClientException = (AmazonClientException) e.getCause();
String errorMessage = amazonClientException.getMessage();
Matcher exMsgStatusCodeMatcher = STATUS_CODE_5XX_PATTERN.matcher(errorMessage);
logger.error("Upload to S3 failed: {}", errorMessage);
// only retry if exception is retryable, the status code is 5xx, and we have retry counts left
if (amazonClientException.isRetryable() && exMsgStatusCodeMatcher.find() && retryCount <= 2) {
retryCount++;
logger.info("Retrying upload to S3 [RetryCount: {}]", retryCount);
continue;
} else {
allowRetry = false;
logger.warn("Cancelling upload to S3 [RetryCount: {}]", retryCount);
throw new RuntimeException(String.format("Upload to S3 failed [Directory: %s, S3 location: %s, Reason: %s, RetryCount: %s]", directory, outputS3ObjectInfo, errorMessage, retryCount));
}
}
}
allowRetry = false;
} catch (CancellationException e) {
logger.warn(e.getMessage());
Thread.currentThread().interrupt();
}
}
}