in src/main/java/software/amazon/encryption/s3/S3EncryptionClient.java [277:358]
private CompleteMultipartUploadResponse multipartPutObject(PutObjectRequest request, RequestBody requestBody) throws Throwable {
// Similar logic exists in the MultipartUploadObjectPipeline,
// but the request types do not match so refactoring is not possible
final long contentLength;
if (request.contentLength() != null) {
if (requestBody.optionalContentLength().isPresent() && !request.contentLength().equals(requestBody.optionalContentLength().get())) {
// if the contentLength values do not match, throw an exception, since we don't know which is correct
throw new S3EncryptionClientException("The contentLength provided in the request object MUST match the " +
"contentLength in the request body");
} else if (!requestBody.optionalContentLength().isPresent()) {
// no contentLength in request body, use the one in request
contentLength = request.contentLength();
} else {
// only remaining case is when the values match, so either works here
contentLength = request.contentLength();
}
} else {
contentLength = requestBody.optionalContentLength().orElse(-1L);
}
if (contentLength > AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF.cipherMaxContentLengthBytes()) {
throw new S3EncryptionClientException("The contentLength of the object you are attempting to encrypt exceeds" +
"the maximum length allowed for GCM encryption.");
}
MultipartConfiguration multipartConfiguration;
// If MultipartConfiguration is null, Initialize MultipartConfiguration
if (request.overrideConfiguration().isPresent()) {
multipartConfiguration = request.overrideConfiguration().get()
.executionAttributes()
.getOptionalAttribute(S3EncryptionClient.CONFIGURATION)
.orElse(MultipartConfiguration.builder().build());
} else {
multipartConfiguration = MultipartConfiguration.builder().build();
}
ExecutorService es = multipartConfiguration.executorService();
if (es == null) {
throw new S3EncryptionClientException("ExecutorService should not be null, Please initialize during MultipartConfiguration");
}
UploadObjectObserver observer = multipartConfiguration.uploadObjectObserver();
if (observer == null) {
throw new S3EncryptionClientException("UploadObjectObserver should not be null, Please initialize during MultipartConfiguration");
}
observer.init(request, _wrappedAsyncClient, this, es);
final String uploadId = observer.onUploadCreation(request);
final List<CompletedPart> partETags = new ArrayList<>();
MultiFileOutputStream outputStream = multipartConfiguration.multiFileOutputStream();
if (outputStream == null) {
throw new S3EncryptionClientException("MultiFileOutputStream should not be null, Please initialize during MultipartConfiguration");
}
try {
// initialize the multi-file output stream
outputStream.init(observer, multipartConfiguration.partSize(), multipartConfiguration.diskLimit());
// Kicks off the encryption-upload pipeline;
// Note outputStream is automatically closed upon method completion.
_multipartPipeline.putLocalObject(requestBody, uploadId, outputStream);
// block till all part have been uploaded
for (Future<Map<Integer, UploadPartResponse>> future : observer.futures()) {
Map<Integer, UploadPartResponse> partResponseMap = future.get();
partResponseMap.forEach((partNumber, uploadPartResponse) -> partETags.add(CompletedPart.builder()
.partNumber(partNumber)
.eTag(uploadPartResponse.eTag())
.build()));
}
} catch (IOException | InterruptedException | ExecutionException | RuntimeException | Error ex) {
throw onAbort(observer, ex);
} finally {
if (multipartConfiguration.usingDefaultExecutorService()) {
// shut down the thread pool if it was created by the encryption client
es.shutdownNow();
}
// delete left-over temp files
outputStream.cleanup();
}
// Complete upload
return observer.onCompletion(partETags);
}