public Response put()

in hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java [219:403]


  public Response put(
      @PathParam("bucket") String bucketName,
      @PathParam("path") String keyPath,
      @HeaderParam("Content-Length") long length,
      @QueryParam("partNumber")  int partNumber,
      @QueryParam("uploadId") @DefaultValue("") String uploadID,
      @QueryParam("tagging") String taggingMarker,
      @QueryParam("acl") String aclMarker,
      final InputStream body) throws IOException, OS3Exception {
    long startNanos = Time.monotonicNowNanos();
    S3GAction s3GAction = S3GAction.CREATE_KEY;
    boolean auditSuccess = true;
    PerformanceStringBuilder perf = new PerformanceStringBuilder();

    String copyHeader = null, storageType = null;
    DigestInputStream digestInputStream = null;
    try {
      if (aclMarker != null) {
        s3GAction = S3GAction.PUT_OBJECT_ACL;
        throw newError(NOT_IMPLEMENTED, keyPath);
      }
      OzoneVolume volume = getVolume();
      if (taggingMarker != null) {
        s3GAction = S3GAction.PUT_OBJECT_TAGGING;
        return putObjectTagging(volume, bucketName, keyPath, body);
      }

      if (uploadID != null && !uploadID.equals("")) {
        if (headers.getHeaderString(COPY_SOURCE_HEADER) == null) {
          s3GAction = S3GAction.CREATE_MULTIPART_KEY;
        } else {
          s3GAction = S3GAction.CREATE_MULTIPART_KEY_BY_COPY;
        }
        // If uploadID is specified, it is a request for upload part
        return createMultipartKey(volume, bucketName, keyPath, length,
            partNumber, uploadID, body, perf);
      }

      copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
      storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
      boolean storageTypeDefault = StringUtils.isEmpty(storageType);

      // Normal put object
      OzoneBucket bucket = volume.getBucket(bucketName);
      ReplicationConfig replicationConfig =
          getReplicationConfig(bucket, storageType);

      boolean enableEC = false;
      if ((replicationConfig != null &&
          replicationConfig.getReplicationType() == EC) ||
          bucket.getReplicationConfig() instanceof ECReplicationConfig) {
        enableEC = true;
      }

      if (copyHeader != null) {
        //Copy object, as copy source available.
        s3GAction = S3GAction.COPY_OBJECT;
        CopyObjectResponse copyObjectResponse = copyObject(volume,
            copyHeader, bucketName, keyPath, replicationConfig,
            storageTypeDefault, perf);
        return Response.status(Status.OK).entity(copyObjectResponse).header(
            "Connection", "close").build();
      }

      boolean canCreateDirectory = ozoneConfiguration
          .getBoolean(OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
              OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED_DEFAULT) &&
          bucket.getBucketLayout() == BucketLayout.FILE_SYSTEM_OPTIMIZED;

      String amzDecodedLength =
          headers.getHeaderString(S3Consts.DECODED_CONTENT_LENGTH_HEADER);
      boolean hasAmzDecodedLengthZero = amzDecodedLength != null &&
          Long.parseLong(amzDecodedLength) == 0;
      if (canCreateDirectory &&
          (length == 0 || hasAmzDecodedLengthZero) &&
          StringUtils.endsWith(keyPath, "/")
      ) {
        s3GAction = S3GAction.CREATE_DIRECTORY;
        getClientProtocol()
            .createDirectory(volume.getName(), bucketName, keyPath);
        long metadataLatencyNs =
            getMetrics().updatePutKeyMetadataStats(startNanos);
        perf.appendMetaLatencyNanos(metadataLatencyNs);
        return Response.ok().status(HttpStatus.SC_OK).build();
      }

      // Normal put object
      Map<String, String> customMetadata =
          getCustomMetadataFromHeaders(headers.getRequestHeaders());

      if (S3Utils.hasSignedPayloadHeader(headers)) {
        digestInputStream = new DigestInputStream(new SignedChunksInputStream(body),
            getMessageDigestInstance());
        length = Long.parseLong(amzDecodedLength);
      } else {
        digestInputStream = new DigestInputStream(body, getMessageDigestInstance());
      }

      Map<String, String> tags = getTaggingFromHeaders(headers);

      long putLength;
      String eTag = null;
      if (datastreamEnabled && !enableEC && length > datastreamMinLength) {
        perf.appendStreamMode();
        Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
            .put(bucket, keyPath, length, replicationConfig, chunkSize,
                customMetadata, tags, digestInputStream, perf);
        eTag = keyWriteResult.getKey();
        putLength = keyWriteResult.getValue();
      } else {
        try (OzoneOutputStream output = getClientProtocol().createKey(
            volume.getName(), bucketName, keyPath, length, replicationConfig,
            customMetadata, tags)) {
          long metadataLatencyNs =
              getMetrics().updatePutKeyMetadataStats(startNanos);
          perf.appendMetaLatencyNanos(metadataLatencyNs);
          putLength = IOUtils.copyLarge(digestInputStream, output, 0, length,
              new byte[getIOBufferSize(length)]);
          eTag = DatatypeConverter.printHexBinary(
                  digestInputStream.getMessageDigest().digest())
              .toLowerCase();
          output.getMetadata().put(ETAG, eTag);
        }
      }
      getMetrics().incPutKeySuccessLength(putLength);
      perf.appendSizeBytes(putLength);
      return Response.ok()
          .header(ETAG, wrapInQuotes(eTag))
          .status(HttpStatus.SC_OK)
          .build();
    } catch (OMException ex) {
      auditSuccess = false;
      auditWriteFailure(s3GAction, ex);
      if (taggingMarker != null) {
        getMetrics().updatePutObjectTaggingFailureStats(startNanos);
      } else if (copyHeader != null) {
        getMetrics().updateCopyObjectFailureStats(startNanos);
      } else {
        getMetrics().updateCreateKeyFailureStats(startNanos);
      }
      if (ex.getResult() == ResultCodes.NOT_A_FILE) {
        OS3Exception os3Exception = newError(INVALID_REQUEST, keyPath, ex);
        os3Exception.setErrorMessage("An error occurred (InvalidRequest) " +
            "when calling the PutObject/MPU PartUpload operation: " +
            OmConfig.Keys.ENABLE_FILESYSTEM_PATHS + " is enabled Keys are" +
            " considered as Unix Paths. Path has Violated FS Semantics " +
            "which caused put operation to fail.");
        throw os3Exception;
      } else if (isAccessDenied(ex)) {
        throw newError(S3ErrorTable.ACCESS_DENIED, keyPath, ex);
      } else if (ex.getResult() == ResultCodes.QUOTA_EXCEEDED) {
        throw newError(S3ErrorTable.QUOTA_EXCEEDED, keyPath, ex);
      } else if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
        throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName, ex);
      } else if (ex.getResult() == ResultCodes.FILE_ALREADY_EXISTS) {
        throw newError(S3ErrorTable.NO_OVERWRITE, keyPath, ex);
      }
      throw ex;
    } catch (Exception ex) {
      auditSuccess = false;
      auditWriteFailure(s3GAction, ex);
      if (aclMarker != null) {
        getMetrics().updatePutObjectAclFailureStats(startNanos);
      } else if (taggingMarker != null) {
        getMetrics().updatePutObjectTaggingFailureStats(startNanos);
      } else if (copyHeader != null) {
        getMetrics().updateCopyObjectFailureStats(startNanos);
      } else {
        getMetrics().updateCreateKeyFailureStats(startNanos);
      }
      throw ex;
    } finally {
      // Reset the thread-local message digest instance in case of exception
      // and MessageDigest#digest is never called
      if (digestInputStream != null) {
        digestInputStream.getMessageDigest().reset();
      }
      if (auditSuccess) {
        long opLatencyNs = getMetrics().updateCreateKeySuccessStats(startNanos);
        perf.appendOpLatencyNanos(opLatencyNs);
        AUDIT.logWriteSuccess(buildAuditMessageForSuccess(s3GAction,
            getAuditParameters(), perf));
      }
    }
  }