public void onTrigger()

in nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java [480:868]


    public void onTrigger(final ProcessContext context, final ProcessSession session) {

        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final AmazonS3Client s3;

        try {
           s3  = getS3Client(context, flowFile.getAttributes());
        } catch (Exception e) {
            getLogger().error("Failed to initialize S3 client", e);
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }

        final long startNanos = System.nanoTime();

        final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
        final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
        final String cacheKey = getIdentifier() + "/" + bucket + "/" + key;

        final FlowFile ff = flowFile;
        final Map<String, String> attributes = new HashMap<>();
        final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
        final ResourceTransferSource resourceTransferSource = context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);

        attributes.put(S3_BUCKET_KEY, bucket);
        attributes.put(S3_OBJECT_KEY, key);

        final Long multipartThreshold = context.getProperty(MULTIPART_THRESHOLD).asDataSize(DataUnit.B).longValue();
        final Long multipartPartSize = context.getProperty(MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue();

        final long now = System.currentTimeMillis();

        /*
         * If necessary, run age off for existing uploads in AWS S3 and local state
         */
        ageoffS3Uploads(context, s3, now, bucket);

        /*
         * Then
         */
        try {
            final FlowFile flowFileCopy = flowFile;
            Optional<FileResource> optFileResource = getFileResource(resourceTransferSource, context, flowFile.getAttributes());
            try (InputStream in = optFileResource
                    .map(FileResource::getInputStream)
                    .orElseGet(() -> session.read(flowFileCopy))) {
                final ObjectMetadata objectMetadata = new ObjectMetadata();
                objectMetadata.setContentLength(optFileResource.map(FileResource::getSize).orElseGet(ff::getSize));

                final String contentType = context.getProperty(CONTENT_TYPE)
                        .evaluateAttributeExpressions(ff).getValue();
                if (contentType != null) {
                    objectMetadata.setContentType(contentType);
                    attributes.put(S3_CONTENT_TYPE, contentType);
                }

                final String cacheControl = context.getProperty(CACHE_CONTROL)
                        .evaluateAttributeExpressions(ff).getValue();
                if (cacheControl != null) {
                    objectMetadata.setCacheControl(cacheControl);
                    attributes.put(S3_CACHE_CONTROL, cacheControl);
                }

                final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue();
                String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), StandardCharsets.UTF_8);
                if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
                    objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
                    attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE);
                } else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
                    String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
                    objectMetadata.setContentDisposition(contentDispositionValue);
                    attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
                } else {
                    objectMetadata.setContentDisposition(fileName);
                }

                final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
                        .evaluateAttributeExpressions(ff).getValue();
                if (expirationRule != null) {
                    objectMetadata.setExpirationTimeRuleId(expirationRule);
                }

                final Map<String, String> userMetadata = new HashMap<>();
                for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
                    if (entry.getKey().isDynamic()) {
                        final String value = context.getProperty(
                                entry.getKey()).evaluateAttributeExpressions(ff).getValue();
                        userMetadata.put(entry.getKey().getName(), value);
                    }
                }

                final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
                AmazonS3EncryptionService encryptionService = null;

                if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
                    objectMetadata.setSSEAlgorithm(serverSideEncryption);
                    attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
                } else {
                    encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
                }

                if (!userMetadata.isEmpty()) {
                    objectMetadata.setUserMetadata(userMetadata);
                }

                if (ff.getSize() <= multipartThreshold) {
                    //----------------------------------------
                    // single part upload
                    //----------------------------------------
                    final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
                    if (encryptionService != null) {
                        encryptionService.configurePutObjectRequest(request, objectMetadata);
                        attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
                    }

                    request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
                    final AccessControlList acl = createACL(context, ff);
                    if (acl != null) {
                        request.setAccessControlList(acl);
                    }

                    final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
                    if (cannedAcl != null) {
                        request.withCannedAcl(cannedAcl);
                    }

                    if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
                        request.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
                    }

                    try {
                        final PutObjectResult result = s3.putObject(request);
                        if (result.getVersionId() != null) {
                            attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
                        }
                        if (result.getETag() != null) {
                            attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
                        }
                        if (result.getExpirationTime() != null) {
                            attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
                        }
                        if (result.getMetadata().getStorageClass() != null) {
                            attributes.put(S3_STORAGECLASS_ATTR_KEY, result.getMetadata().getStorageClass());
                        } else {
                            attributes.put(S3_STORAGECLASS_ATTR_KEY, StorageClass.Standard.toString());
                        }
                        if (userMetadata.size() > 0) {
                            StringBuilder userMetaBldr = new StringBuilder();
                            for (String userKey : userMetadata.keySet()) {
                                userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
                            }
                            attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
                        }
                        attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_PUTOBJECT);
                    } catch (AmazonClientException e) {
                        getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
                                ffFilename, bucket, key, e.getMessage());
                        throw (e);
                    }
                } else {
                    //----------------------------------------
                    // multipart upload
                    //----------------------------------------

                    // load or create persistent state
                    //------------------------------------------------------------
                    MultipartState currentState;
                    try {
                        currentState = getLocalStateIfInS3(s3, bucket, cacheKey);
                        if (currentState != null) {
                            if (currentState.getPartETags().size() > 0) {
                                final PartETag lastETag = currentState.getPartETags().get(
                                        currentState.getPartETags().size() - 1);
                                getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
                                                "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
                                                "contentLength='{}' partsLoaded={} lastPart={}/{}",
                                        ffFilename, bucket, key, currentState.getUploadId(),
                                        currentState.getFilePosition(), currentState.getPartSize(),
                                        currentState.getStorageClass().toString(),
                                        currentState.getContentLength(),
                                        currentState.getPartETags().size(),
                                        Integer.toString(lastETag.getPartNumber()),
                                        lastETag.getETag());
                            } else {
                                getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
                                                "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
                                                "contentLength='{}' no partsLoaded",
                                        ffFilename, bucket, key, currentState.getUploadId(),
                                        currentState.getFilePosition(), currentState.getPartSize(),
                                        currentState.getStorageClass().toString(),
                                        currentState.getContentLength());
                            }
                        } else {
                            currentState = new MultipartState();
                            currentState.setPartSize(multipartPartSize);
                            currentState.setStorageClass(
                                    StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
                            currentState.setContentLength(ff.getSize());
                            persistLocalState(cacheKey, currentState);
                            getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'",
                                    ffFilename, bucket, key);
                        }
                    } catch (IOException e) {
                        getLogger().error("IOException initiating cache state while processing flow files", e);
                        throw (e);
                    }

                    // initiate multipart upload or find position in file
                    //------------------------------------------------------------
                    if (currentState.getUploadId().isEmpty()) {
                        final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
                        if (encryptionService != null) {
                            encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, objectMetadata);
                            attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
                        }
                        initiateRequest.setStorageClass(currentState.getStorageClass());

                        final AccessControlList acl = createACL(context, ff);
                        if (acl != null) {
                            initiateRequest.setAccessControlList(acl);
                        }
                        final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
                        if (cannedAcl != null) {
                            initiateRequest.withCannedACL(cannedAcl);
                        }

                        if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
                            initiateRequest.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
                        }

                        try {
                            final InitiateMultipartUploadResult initiateResult =
                                    s3.initiateMultipartUpload(initiateRequest);
                            currentState.setUploadId(initiateResult.getUploadId());
                            currentState.getPartETags().clear();
                            try {
                                persistLocalState(cacheKey, currentState);
                            } catch (Exception e) {
                                getLogger().info("Exception saving cache state while processing flow file", e);
                                throw (new ProcessException("Exception saving cache state", e));
                            }
                            getLogger().info("Success initiating upload flowfile={} available={} position={} length={} bucket={} key={} uploadId={}",
                                    ffFilename, in.available(), currentState.getFilePosition(),
                                    currentState.getContentLength(), bucket, key,
                                    currentState.getUploadId());
                            if (initiateResult.getUploadId() != null) {
                                attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
                            }
                        } catch (AmazonClientException e) {
                            getLogger().info("Failure initiating upload flowfile={} bucket={} key={}", ffFilename, bucket, key, e);
                            throw (e);
                        }
                    } else {
                        if (currentState.getFilePosition() > 0) {
                            try {
                                final long skipped = in.skip(currentState.getFilePosition());
                                if (skipped != currentState.getFilePosition()) {
                                    getLogger().info("Failure skipping to resume upload flowfile={} bucket={} key={} position={} skipped={}",
                                            ffFilename, bucket, key, currentState.getFilePosition(), skipped);
                                }
                            } catch (Exception e) {
                                getLogger().info("Failure skipping to resume upload flowfile={} bucket={} key={} position={}", ffFilename, bucket, key, currentState.getFilePosition(), e);
                                throw (new ProcessException(e));
                            }
                        }
                    }

                    // upload parts
                    //------------------------------------------------------------
                    long thisPartSize;
                    boolean isLastPart;
                    for (int part = currentState.getPartETags().size() + 1;
                         currentState.getFilePosition() < currentState.getContentLength(); part++) {
                        if (!PutS3Object.this.isScheduled()) {
                            throw new IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
                                    " part=" + part + " uploadId=" + currentState.getUploadId());
                        }
                        thisPartSize = Math.min(currentState.getPartSize(),
                                (currentState.getContentLength() - currentState.getFilePosition()));
                        isLastPart = currentState.getContentLength() == currentState.getFilePosition() + thisPartSize;
                        UploadPartRequest uploadRequest = new UploadPartRequest()
                                .withBucketName(bucket)
                                .withKey(key)
                                .withUploadId(currentState.getUploadId())
                                .withInputStream(in)
                                .withPartNumber(part)
                                .withPartSize(thisPartSize)
                                .withLastPart(isLastPart);
                        if (encryptionService != null) {
                            encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
                        }
                        try {
                            UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
                            currentState.addPartETag(uploadPartResult.getPartETag());
                            currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
                            try {
                                persistLocalState(cacheKey, currentState);
                            } catch (Exception e) {
                                getLogger().info("Exception saving cache state processing flow file", e);
                            }
                            int available = 0;
                            try {
                                available = in.available();
                            } catch (IOException ignored) {
                                // in case of the last part, the stream is already closed
                            }
                            getLogger().info("Success uploading part flowfile={} part={} available={} etag={} uploadId={}",
                                    ffFilename, part, available, uploadPartResult.getETag(), currentState.getUploadId());
                        } catch (AmazonClientException e) {
                            getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={}", ffFilename, part, bucket, key, e);
                            throw (e);
                        }
                    }

                    // complete multipart upload
                    //------------------------------------------------------------
                    CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
                            bucket, key, currentState.getUploadId(), currentState.getPartETags());

                    // No call to an encryption service is needed for a CompleteMultipartUploadRequest.
                    try {
                        CompleteMultipartUploadResult completeResult =
                                s3.completeMultipartUpload(completeRequest);
                        getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
                                ffFilename, completeResult.getETag(), currentState.getUploadId());
                        if (completeResult.getVersionId() != null) {
                            attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
                        }
                        if (completeResult.getETag() != null) {
                            attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag());
                        }
                        if (completeResult.getExpirationTime() != null) {
                            attributes.put(S3_EXPIRATION_ATTR_KEY,
                                    completeResult.getExpirationTime().toString());
                        }
                        if (currentState.getStorageClass() != null) {
                            attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString());
                        }
                        if (userMetadata.size() > 0) {
                            StringBuilder userMetaBldr = new StringBuilder();
                            for (String userKey : userMetadata.keySet()) {
                                userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
                            }
                            attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
                        }
                        attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_MULTIPARTUPLOAD);
                    } catch (AmazonClientException e) {
                        getLogger().info("Failure completing upload flowfile={} bucket={} key={}",
                                ffFilename, bucket, key, e);
                        throw (e);
                    }
                }
            } catch (IOException e) {
                getLogger().error("Error during upload of flow files", e);
                throw e;
            }

            final String url = s3.getResourceUrl(bucket, key);
            attributes.put("s3.url", url);
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, REL_SUCCESS);

            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
            session.getProvenanceReporter().send(flowFile, url, millis);

            getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", ff, millis);
            try {
                removeLocalState(cacheKey);
            } catch (IOException e) {
                getLogger().info("Error trying to delete key {} from cache:", cacheKey, e);
            }

        } catch (final IllegalArgumentException | ProcessException | AmazonClientException | IOException e) {
            extractExceptionDetails(e, session, flowFile);
            if (e.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
                getLogger().info(e.getMessage());
                session.rollback();
            } else {
                getLogger().error("Failed to put {} to Amazon S3 due to {}", flowFile, e);
                flowFile = session.penalize(flowFile);
                session.transfer(flowFile, REL_FAILURE);
            }
        }
    }