in hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java [957:1126]
private Response createMultipartKey(OzoneVolume volume, String bucket,
String key, long length, int partNumber, String uploadID,
final InputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
String copyHeader = null;
DigestInputStream digestInputStream = null;
try {
if (S3Utils.hasSignedPayloadHeader(headers)) {
digestInputStream = new DigestInputStream(new SignedChunksInputStream(body),
getMessageDigestInstance());
length = Long.parseLong(
headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER));
} else {
digestInputStream = new DigestInputStream(body, getMessageDigestInstance());
}
copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
final OzoneBucket ozoneBucket = volume.getBucket(bucket);
ReplicationConfig replicationConfig =
getReplicationConfig(ozoneBucket, storageType);
boolean enableEC = false;
if ((replicationConfig != null &&
replicationConfig.getReplicationType() == EC) ||
ozoneBucket.getReplicationConfig() instanceof ECReplicationConfig) {
enableEC = true;
}
if (datastreamEnabled && !enableEC && copyHeader == null) {
perf.appendStreamMode();
return ObjectEndpointStreaming
.createMultipartKey(ozoneBucket, key, length, partNumber,
uploadID, chunkSize, digestInputStream, perf);
}
// OmMultipartCommitUploadPartInfo can only be gotten after the
// OzoneOutputStream is closed, so we need to save the OzoneOutputStream
final OzoneOutputStream outputStream;
long metadataLatencyNs;
if (copyHeader != null) {
Pair<String, String> result = parseSourceHeader(copyHeader);
String sourceBucket = result.getLeft();
String sourceKey = result.getRight();
OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails(
volume.getName(), sourceBucket, sourceKey);
String range =
headers.getHeaderString(COPY_SOURCE_HEADER_RANGE);
RangeHeader rangeHeader = null;
if (range != null) {
rangeHeader = RangeHeaderParserUtil.parseRangeHeader(range, 0);
// When copy Range, the size of the target key is the
// length specified by COPY_SOURCE_HEADER_RANGE.
length = rangeHeader.getEndOffset() -
rangeHeader.getStartOffset() + 1;
} else {
length = sourceKeyDetails.getDataSize();
}
Long sourceKeyModificationTime = sourceKeyDetails
.getModificationTime().toEpochMilli();
String copySourceIfModifiedSince =
headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE);
String copySourceIfUnmodifiedSince =
headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE);
if (!checkCopySourceModificationTime(sourceKeyModificationTime,
copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) {
throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey);
}
try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) {
long copyLength;
if (range != null) {
final long skipped =
sourceObject.skip(rangeHeader.getStartOffset());
if (skipped != rangeHeader.getStartOffset()) {
throw new EOFException(
"Bytes to skip: "
+ rangeHeader.getStartOffset() + " actual: " + skipped);
}
try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
.createMultipartKey(volume.getName(), bucket, key, length,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(
sourceObject, ozoneOutputStream, 0, length, new byte[getIOBufferSize(length)]);
ozoneOutputStream.getMetadata()
.putAll(sourceKeyDetails.getMetadata());
outputStream = ozoneOutputStream;
}
} else {
try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
.createMultipartKey(volume.getName(), bucket, key, length,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream, 0, length,
new byte[getIOBufferSize(length)]);
ozoneOutputStream.getMetadata()
.putAll(sourceKeyDetails.getMetadata());
outputStream = ozoneOutputStream;
}
}
getMetrics().incCopyObjectSuccessLength(copyLength);
perf.appendSizeBytes(copyLength);
}
} else {
long putLength;
try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
.createMultipartKey(volume.getName(), bucket, key, length,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream, 0, length,
new byte[getIOBufferSize(length)]);
byte[] digest = digestInputStream.getMessageDigest().digest();
ozoneOutputStream.getMetadata()
.put(ETAG, DatatypeConverter.printHexBinary(digest).toLowerCase());
outputStream = ozoneOutputStream;
}
getMetrics().incPutKeySuccessLength(putLength);
perf.appendSizeBytes(putLength);
}
perf.appendMetaLatencyNanos(metadataLatencyNs);
OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
outputStream.getCommitUploadPartInfo();
String eTag = omMultipartCommitUploadPartInfo.getETag();
// If the OmMultipartCommitUploadPartInfo does not contain eTag,
// fall back to MPU part name for compatibility in case the (old) OM
// does not return the eTag field
if (StringUtils.isEmpty(eTag)) {
eTag = omMultipartCommitUploadPartInfo.getPartName();
}
if (copyHeader != null) {
getMetrics().updateCopyObjectSuccessStats(startNanos);
return Response.ok(new CopyPartResult(eTag)).build();
} else {
getMetrics().updateCreateMultipartKeySuccessStats(startNanos);
return Response.ok().header(ETAG, eTag).build();
}
} catch (OMException ex) {
if (copyHeader != null) {
getMetrics().updateCopyObjectFailureStats(startNanos);
} else {
getMetrics().updateCreateMultipartKeyFailureStats(startNanos);
}
if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
throw newError(NO_SUCH_UPLOAD, uploadID, ex);
} else if (isAccessDenied(ex)) {
throw newError(S3ErrorTable.ACCESS_DENIED, bucket + "/" + key, ex);
} else if (ex.getResult() == ResultCodes.INVALID_PART) {
OS3Exception os3Exception = newError(
S3ErrorTable.INVALID_ARGUMENT, String.valueOf(partNumber), ex);
os3Exception.setErrorMessage(ex.getMessage());
throw os3Exception;
}
throw ex;
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
if (digestInputStream != null) {
digestInputStream.getMessageDigest().reset();
}
}
}