private Response createMultipartKey()

in hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java [957:1126]


  private Response createMultipartKey(OzoneVolume volume, String bucket,
      String key, long length, int partNumber, String uploadID,
      final InputStream body, PerformanceStringBuilder perf)
      throws IOException, OS3Exception {
    long startNanos = Time.monotonicNowNanos();
    String copyHeader = null;
    DigestInputStream digestInputStream = null;
    try {

      if (S3Utils.hasSignedPayloadHeader(headers)) {
        digestInputStream = new DigestInputStream(new SignedChunksInputStream(body),
            getMessageDigestInstance());
        length = Long.parseLong(
            headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER));
      } else {
        digestInputStream = new DigestInputStream(body, getMessageDigestInstance());
      }

      copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
      String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
      final OzoneBucket ozoneBucket = volume.getBucket(bucket);
      ReplicationConfig replicationConfig =
          getReplicationConfig(ozoneBucket, storageType);

      boolean enableEC = false;
      if ((replicationConfig != null &&
          replicationConfig.getReplicationType()  == EC) ||
          ozoneBucket.getReplicationConfig() instanceof ECReplicationConfig) {
        enableEC = true;
      }

      if (datastreamEnabled && !enableEC && copyHeader == null) {
        perf.appendStreamMode();
        return ObjectEndpointStreaming
            .createMultipartKey(ozoneBucket, key, length, partNumber,
                uploadID, chunkSize, digestInputStream, perf);
      }
      // OmMultipartCommitUploadPartInfo can only be gotten after the
      // OzoneOutputStream is closed, so we need to save the OzoneOutputStream
      final OzoneOutputStream outputStream;
      long metadataLatencyNs;
      if (copyHeader != null) {
        Pair<String, String> result = parseSourceHeader(copyHeader);
        String sourceBucket = result.getLeft();
        String sourceKey = result.getRight();

        OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails(
            volume.getName(), sourceBucket, sourceKey);
        String range =
            headers.getHeaderString(COPY_SOURCE_HEADER_RANGE);
        RangeHeader rangeHeader = null;
        if (range != null) {
          rangeHeader = RangeHeaderParserUtil.parseRangeHeader(range, 0);
          // When copy Range, the size of the target key is the
          // length specified by COPY_SOURCE_HEADER_RANGE.
          length = rangeHeader.getEndOffset() -
              rangeHeader.getStartOffset() + 1;
        } else {
          length = sourceKeyDetails.getDataSize();
        }
        Long sourceKeyModificationTime = sourceKeyDetails
            .getModificationTime().toEpochMilli();
        String copySourceIfModifiedSince =
            headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE);
        String copySourceIfUnmodifiedSince =
            headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE);
        if (!checkCopySourceModificationTime(sourceKeyModificationTime,
            copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) {
          throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey);
        }

        try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) {
          long copyLength;
          if (range != null) {
            final long skipped =
                sourceObject.skip(rangeHeader.getStartOffset());
            if (skipped != rangeHeader.getStartOffset()) {
              throw new EOFException(
                  "Bytes to skip: "
                      + rangeHeader.getStartOffset() + " actual: " + skipped);
            }
            try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
                .createMultipartKey(volume.getName(), bucket, key, length,
                    partNumber, uploadID)) {
              metadataLatencyNs =
                  getMetrics().updateCopyKeyMetadataStats(startNanos);
              copyLength = IOUtils.copyLarge(
                  sourceObject, ozoneOutputStream, 0, length, new byte[getIOBufferSize(length)]);
              ozoneOutputStream.getMetadata()
                  .putAll(sourceKeyDetails.getMetadata());
              outputStream = ozoneOutputStream;
            }
          } else {
            try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
                .createMultipartKey(volume.getName(), bucket, key, length,
                    partNumber, uploadID)) {
              metadataLatencyNs =
                  getMetrics().updateCopyKeyMetadataStats(startNanos);
              copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream, 0, length,
                  new byte[getIOBufferSize(length)]);
              ozoneOutputStream.getMetadata()
                  .putAll(sourceKeyDetails.getMetadata());
              outputStream = ozoneOutputStream;
            }
          }
          getMetrics().incCopyObjectSuccessLength(copyLength);
          perf.appendSizeBytes(copyLength);
        }
      } else {
        long putLength;
        try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
            .createMultipartKey(volume.getName(), bucket, key, length,
                partNumber, uploadID)) {
          metadataLatencyNs =
              getMetrics().updatePutKeyMetadataStats(startNanos);
          putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream, 0, length,
              new byte[getIOBufferSize(length)]);
          byte[] digest = digestInputStream.getMessageDigest().digest();
          ozoneOutputStream.getMetadata()
              .put(ETAG, DatatypeConverter.printHexBinary(digest).toLowerCase());
          outputStream = ozoneOutputStream;
        }
        getMetrics().incPutKeySuccessLength(putLength);
        perf.appendSizeBytes(putLength);
      }
      perf.appendMetaLatencyNanos(metadataLatencyNs);

      OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
          outputStream.getCommitUploadPartInfo();
      String eTag = omMultipartCommitUploadPartInfo.getETag();
      // If the OmMultipartCommitUploadPartInfo does not contain eTag,
      // fall back to MPU part name for compatibility in case the (old) OM
      // does not return the eTag field
      if (StringUtils.isEmpty(eTag)) {
        eTag = omMultipartCommitUploadPartInfo.getPartName();
      }

      if (copyHeader != null) {
        getMetrics().updateCopyObjectSuccessStats(startNanos);
        return Response.ok(new CopyPartResult(eTag)).build();
      } else {
        getMetrics().updateCreateMultipartKeySuccessStats(startNanos);
        return Response.ok().header(ETAG, eTag).build();
      }

    } catch (OMException ex) {
      if (copyHeader != null) {
        getMetrics().updateCopyObjectFailureStats(startNanos);
      } else {
        getMetrics().updateCreateMultipartKeyFailureStats(startNanos);
      }
      if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
        throw newError(NO_SUCH_UPLOAD, uploadID, ex);
      } else if (isAccessDenied(ex)) {
        throw newError(S3ErrorTable.ACCESS_DENIED, bucket + "/" + key, ex);
      } else if (ex.getResult() == ResultCodes.INVALID_PART) {
        OS3Exception os3Exception = newError(
            S3ErrorTable.INVALID_ARGUMENT, String.valueOf(partNumber), ex);
        os3Exception.setErrorMessage(ex.getMessage());
        throw os3Exception;
      }
      throw ex;
    } finally {
      // Reset the thread-local message digest instance in case of exception
      // and MessageDigest#digest is never called
      if (digestInputStream != null) {
        digestInputStream.getMessageDigest().reset();
      }
    }
  }