in src/main/java/s3NativeClient/com/amazonaws/s3/S3NativeClient.java [244:337]
public CompletableFuture<PutObjectOutput> putObject(PutObjectRequest request,
final RequestDataSupplier requestDataSupplier) {
final CompletableFuture<PutObjectOutput> resultFuture = new CompletableFuture<>();
final PutObjectOutput.Builder resultBuilder = PutObjectOutput.builder();
HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() {
@Override
public boolean sendRequestBody(final ByteBuffer outBuffer) {
try {
return requestDataSupplier.getRequestBytes(outBuffer);
} catch (Exception e) {
resultFuture.completeExceptionally(e);
return true;
}
}
@Override
public boolean resetPosition() {
try {
return requestDataSupplier.resetPosition();
} catch (Exception e) {
return false;
}
}
@Override
public long getLength() {
return request.contentLength();
}
};
final List<HttpHeader> headers = new LinkedList<>();
// TODO: additional logic needed for *special* partitions
headers.add(new HttpHeader("Host", request.bucket() + ".s3." + signingRegion + ".amazonaws.com"));
populatePutObjectRequestHeaders(header -> headers.add(header), request);
addCustomHeaders(headers, request.customHeaders());
String encodedPath = getEncodedPath(request.key(), request.customQueryParameters());
HttpRequest httpRequest = new HttpRequest("PUT", encodedPath, headers.toArray(new HttpHeader[0]),
payloadStream);
final S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() {
@Override
public void onResponseHeaders(final int statusCode, final HttpHeader[] headers) {
for (int headerIndex = 0; headerIndex < headers.length; ++headerIndex) {
try {
populatePutObjectOutputHeader(resultBuilder, headers[headerIndex]);
} catch (Exception e) {
resultFuture.completeExceptionally(new RuntimeException(
String.format(
"Could not process response header {%s}: " + headers[headerIndex].getName()),
e));
}
}
requestDataSupplier.onResponseHeaders(statusCode, headers);
}
@Override
public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) {
return 0;
}
@Override
public void onFinished(int errorCode, int responseStatus, byte[] errorPayload) {
CrtS3RuntimeException ex = null;
try {
if (errorCode != CRT.AWS_CRT_SUCCESS) {
ex = new CrtS3RuntimeException(errorCode, responseStatus, errorPayload);
requestDataSupplier.onException(ex);
} else {
requestDataSupplier.onFinished();
}
} catch (Exception e) { /* ignore user callback exception */
} finally {
if (ex != null) {
resultFuture.completeExceptionally(ex);
} else {
resultFuture.complete(resultBuilder.build());
}
}
}
};
S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions()
.withMetaRequestType(S3MetaRequestOptions.MetaRequestType.PUT_OBJECT).withHttpRequest(httpRequest)
.withResponseHandler(responseHandler);
try (final S3MetaRequest metaRequest = s3Client.makeMetaRequest(metaRequestOptions)) {
addCancelCheckToFuture(resultFuture, metaRequest);
return resultFuture;
}
}