in hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java [219:403]
public Response put(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@HeaderParam("Content-Length") long length,
@QueryParam("partNumber") int partNumber,
@QueryParam("uploadId") @DefaultValue("") String uploadID,
@QueryParam("tagging") String taggingMarker,
@QueryParam("acl") String aclMarker,
final InputStream body) throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.CREATE_KEY;
boolean auditSuccess = true;
PerformanceStringBuilder perf = new PerformanceStringBuilder();
String copyHeader = null, storageType = null;
DigestInputStream digestInputStream = null;
try {
if (aclMarker != null) {
s3GAction = S3GAction.PUT_OBJECT_ACL;
throw newError(NOT_IMPLEMENTED, keyPath);
}
OzoneVolume volume = getVolume();
if (taggingMarker != null) {
s3GAction = S3GAction.PUT_OBJECT_TAGGING;
return putObjectTagging(volume, bucketName, keyPath, body);
}
if (uploadID != null && !uploadID.equals("")) {
if (headers.getHeaderString(COPY_SOURCE_HEADER) == null) {
s3GAction = S3GAction.CREATE_MULTIPART_KEY;
} else {
s3GAction = S3GAction.CREATE_MULTIPART_KEY_BY_COPY;
}
// If uploadID is specified, it is a request for upload part
return createMultipartKey(volume, bucketName, keyPath, length,
partNumber, uploadID, body, perf);
}
copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
boolean storageTypeDefault = StringUtils.isEmpty(storageType);
// Normal put object
OzoneBucket bucket = volume.getBucket(bucketName);
ReplicationConfig replicationConfig =
getReplicationConfig(bucket, storageType);
boolean enableEC = false;
if ((replicationConfig != null &&
replicationConfig.getReplicationType() == EC) ||
bucket.getReplicationConfig() instanceof ECReplicationConfig) {
enableEC = true;
}
if (copyHeader != null) {
//Copy object, as copy source available.
s3GAction = S3GAction.COPY_OBJECT;
CopyObjectResponse copyObjectResponse = copyObject(volume,
copyHeader, bucketName, keyPath, replicationConfig,
storageTypeDefault, perf);
return Response.status(Status.OK).entity(copyObjectResponse).header(
"Connection", "close").build();
}
boolean canCreateDirectory = ozoneConfiguration
.getBoolean(OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED_DEFAULT) &&
bucket.getBucketLayout() == BucketLayout.FILE_SYSTEM_OPTIMIZED;
String amzDecodedLength =
headers.getHeaderString(S3Consts.DECODED_CONTENT_LENGTH_HEADER);
boolean hasAmzDecodedLengthZero = amzDecodedLength != null &&
Long.parseLong(amzDecodedLength) == 0;
if (canCreateDirectory &&
(length == 0 || hasAmzDecodedLengthZero) &&
StringUtils.endsWith(keyPath, "/")
) {
s3GAction = S3GAction.CREATE_DIRECTORY;
getClientProtocol()
.createDirectory(volume.getName(), bucketName, keyPath);
long metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
return Response.ok().status(HttpStatus.SC_OK).build();
}
// Normal put object
Map<String, String> customMetadata =
getCustomMetadataFromHeaders(headers.getRequestHeaders());
if (S3Utils.hasSignedPayloadHeader(headers)) {
digestInputStream = new DigestInputStream(new SignedChunksInputStream(body),
getMessageDigestInstance());
length = Long.parseLong(amzDecodedLength);
} else {
digestInputStream = new DigestInputStream(body, getMessageDigestInstance());
}
Map<String, String> tags = getTaggingFromHeaders(headers);
long putLength;
String eTag = null;
if (datastreamEnabled && !enableEC && length > datastreamMinLength) {
perf.appendStreamMode();
Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
.put(bucket, keyPath, length, replicationConfig, chunkSize,
customMetadata, tags, digestInputStream, perf);
eTag = keyWriteResult.getKey();
putLength = keyWriteResult.getValue();
} else {
try (OzoneOutputStream output = getClientProtocol().createKey(
volume.getName(), bucketName, keyPath, length, replicationConfig,
customMetadata, tags)) {
long metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
putLength = IOUtils.copyLarge(digestInputStream, output, 0, length,
new byte[getIOBufferSize(length)]);
eTag = DatatypeConverter.printHexBinary(
digestInputStream.getMessageDigest().digest())
.toLowerCase();
output.getMetadata().put(ETAG, eTag);
}
}
getMetrics().incPutKeySuccessLength(putLength);
perf.appendSizeBytes(putLength);
return Response.ok()
.header(ETAG, wrapInQuotes(eTag))
.status(HttpStatus.SC_OK)
.build();
} catch (OMException ex) {
auditSuccess = false;
auditWriteFailure(s3GAction, ex);
if (taggingMarker != null) {
getMetrics().updatePutObjectTaggingFailureStats(startNanos);
} else if (copyHeader != null) {
getMetrics().updateCopyObjectFailureStats(startNanos);
} else {
getMetrics().updateCreateKeyFailureStats(startNanos);
}
if (ex.getResult() == ResultCodes.NOT_A_FILE) {
OS3Exception os3Exception = newError(INVALID_REQUEST, keyPath, ex);
os3Exception.setErrorMessage("An error occurred (InvalidRequest) " +
"when calling the PutObject/MPU PartUpload operation: " +
OmConfig.Keys.ENABLE_FILESYSTEM_PATHS + " is enabled Keys are" +
" considered as Unix Paths. Path has Violated FS Semantics " +
"which caused put operation to fail.");
throw os3Exception;
} else if (isAccessDenied(ex)) {
throw newError(S3ErrorTable.ACCESS_DENIED, keyPath, ex);
} else if (ex.getResult() == ResultCodes.QUOTA_EXCEEDED) {
throw newError(S3ErrorTable.QUOTA_EXCEEDED, keyPath, ex);
} else if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName, ex);
} else if (ex.getResult() == ResultCodes.FILE_ALREADY_EXISTS) {
throw newError(S3ErrorTable.NO_OVERWRITE, keyPath, ex);
}
throw ex;
} catch (Exception ex) {
auditSuccess = false;
auditWriteFailure(s3GAction, ex);
if (aclMarker != null) {
getMetrics().updatePutObjectAclFailureStats(startNanos);
} else if (taggingMarker != null) {
getMetrics().updatePutObjectTaggingFailureStats(startNanos);
} else if (copyHeader != null) {
getMetrics().updateCopyObjectFailureStats(startNanos);
} else {
getMetrics().updateCreateKeyFailureStats(startNanos);
}
throw ex;
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
if (digestInputStream != null) {
digestInputStream.getMessageDigest().reset();
}
if (auditSuccess) {
long opLatencyNs = getMetrics().updateCreateKeySuccessStats(startNanos);
perf.appendOpLatencyNanos(opLatencyNs);
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(s3GAction,
getAuditParameters(), perf));
}
}
}