in nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java [480:868]
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final AmazonS3Client s3;
try {
s3 = getS3Client(context, flowFile.getAttributes());
} catch (Exception e) {
getLogger().error("Failed to initialize S3 client", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String cacheKey = getIdentifier() + "/" + bucket + "/" + key;
final FlowFile ff = flowFile;
final Map<String, String> attributes = new HashMap<>();
final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
final ResourceTransferSource resourceTransferSource = context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
attributes.put(S3_BUCKET_KEY, bucket);
attributes.put(S3_OBJECT_KEY, key);
final Long multipartThreshold = context.getProperty(MULTIPART_THRESHOLD).asDataSize(DataUnit.B).longValue();
final Long multipartPartSize = context.getProperty(MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue();
final long now = System.currentTimeMillis();
/*
* If necessary, run age off for existing uploads in AWS S3 and local state
*/
ageoffS3Uploads(context, s3, now, bucket);
/*
* Then
*/
try {
final FlowFile flowFileCopy = flowFile;
Optional<FileResource> optFileResource = getFileResource(resourceTransferSource, context, flowFile.getAttributes());
try (InputStream in = optFileResource
.map(FileResource::getInputStream)
.orElseGet(() -> session.read(flowFileCopy))) {
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(optFileResource.map(FileResource::getSize).orElseGet(ff::getSize));
final String contentType = context.getProperty(CONTENT_TYPE)
.evaluateAttributeExpressions(ff).getValue();
if (contentType != null) {
objectMetadata.setContentType(contentType);
attributes.put(S3_CONTENT_TYPE, contentType);
}
final String cacheControl = context.getProperty(CACHE_CONTROL)
.evaluateAttributeExpressions(ff).getValue();
if (cacheControl != null) {
objectMetadata.setCacheControl(cacheControl);
attributes.put(S3_CACHE_CONTROL, cacheControl);
}
final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue();
String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), StandardCharsets.UTF_8);
if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE);
} else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
objectMetadata.setContentDisposition(contentDispositionValue);
attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
} else {
objectMetadata.setContentDisposition(fileName);
}
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
.evaluateAttributeExpressions(ff).getValue();
if (expirationRule != null) {
objectMetadata.setExpirationTimeRuleId(expirationRule);
}
final Map<String, String> userMetadata = new HashMap<>();
for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
final String value = context.getProperty(
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
userMetadata.put(entry.getKey().getName(), value);
}
}
final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
AmazonS3EncryptionService encryptionService = null;
if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
objectMetadata.setSSEAlgorithm(serverSideEncryption);
attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
} else {
encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
}
if (!userMetadata.isEmpty()) {
objectMetadata.setUserMetadata(userMetadata);
}
if (ff.getSize() <= multipartThreshold) {
//----------------------------------------
// single part upload
//----------------------------------------
final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
if (encryptionService != null) {
encryptionService.configurePutObjectRequest(request, objectMetadata);
attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
}
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
final AccessControlList acl = createACL(context, ff);
if (acl != null) {
request.setAccessControlList(acl);
}
final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
if (cannedAcl != null) {
request.withCannedAcl(cannedAcl);
}
if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
request.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
}
try {
final PutObjectResult result = s3.putObject(request);
if (result.getVersionId() != null) {
attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
}
if (result.getETag() != null) {
attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
}
if (result.getExpirationTime() != null) {
attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
}
if (result.getMetadata().getStorageClass() != null) {
attributes.put(S3_STORAGECLASS_ATTR_KEY, result.getMetadata().getStorageClass());
} else {
attributes.put(S3_STORAGECLASS_ATTR_KEY, StorageClass.Standard.toString());
}
if (userMetadata.size() > 0) {
StringBuilder userMetaBldr = new StringBuilder();
for (String userKey : userMetadata.keySet()) {
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
}
attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
}
attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_PUTOBJECT);
} catch (AmazonClientException e) {
getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
ffFilename, bucket, key, e.getMessage());
throw (e);
}
} else {
//----------------------------------------
// multipart upload
//----------------------------------------
// load or create persistent state
//------------------------------------------------------------
MultipartState currentState;
try {
currentState = getLocalStateIfInS3(s3, bucket, cacheKey);
if (currentState != null) {
if (currentState.getPartETags().size() > 0) {
final PartETag lastETag = currentState.getPartETags().get(
currentState.getPartETags().size() - 1);
getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
"uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
"contentLength='{}' partsLoaded={} lastPart={}/{}",
ffFilename, bucket, key, currentState.getUploadId(),
currentState.getFilePosition(), currentState.getPartSize(),
currentState.getStorageClass().toString(),
currentState.getContentLength(),
currentState.getPartETags().size(),
Integer.toString(lastETag.getPartNumber()),
lastETag.getETag());
} else {
getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
"uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
"contentLength='{}' no partsLoaded",
ffFilename, bucket, key, currentState.getUploadId(),
currentState.getFilePosition(), currentState.getPartSize(),
currentState.getStorageClass().toString(),
currentState.getContentLength());
}
} else {
currentState = new MultipartState();
currentState.setPartSize(multipartPartSize);
currentState.setStorageClass(
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
currentState.setContentLength(ff.getSize());
persistLocalState(cacheKey, currentState);
getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'",
ffFilename, bucket, key);
}
} catch (IOException e) {
getLogger().error("IOException initiating cache state while processing flow files", e);
throw (e);
}
// initiate multipart upload or find position in file
//------------------------------------------------------------
if (currentState.getUploadId().isEmpty()) {
final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
if (encryptionService != null) {
encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, objectMetadata);
attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
}
initiateRequest.setStorageClass(currentState.getStorageClass());
final AccessControlList acl = createACL(context, ff);
if (acl != null) {
initiateRequest.setAccessControlList(acl);
}
final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
if (cannedAcl != null) {
initiateRequest.withCannedACL(cannedAcl);
}
if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
initiateRequest.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
}
try {
final InitiateMultipartUploadResult initiateResult =
s3.initiateMultipartUpload(initiateRequest);
currentState.setUploadId(initiateResult.getUploadId());
currentState.getPartETags().clear();
try {
persistLocalState(cacheKey, currentState);
} catch (Exception e) {
getLogger().info("Exception saving cache state while processing flow file", e);
throw (new ProcessException("Exception saving cache state", e));
}
getLogger().info("Success initiating upload flowfile={} available={} position={} length={} bucket={} key={} uploadId={}",
ffFilename, in.available(), currentState.getFilePosition(),
currentState.getContentLength(), bucket, key,
currentState.getUploadId());
if (initiateResult.getUploadId() != null) {
attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
}
} catch (AmazonClientException e) {
getLogger().info("Failure initiating upload flowfile={} bucket={} key={}", ffFilename, bucket, key, e);
throw (e);
}
} else {
if (currentState.getFilePosition() > 0) {
try {
final long skipped = in.skip(currentState.getFilePosition());
if (skipped != currentState.getFilePosition()) {
getLogger().info("Failure skipping to resume upload flowfile={} bucket={} key={} position={} skipped={}",
ffFilename, bucket, key, currentState.getFilePosition(), skipped);
}
} catch (Exception e) {
getLogger().info("Failure skipping to resume upload flowfile={} bucket={} key={} position={}", ffFilename, bucket, key, currentState.getFilePosition(), e);
throw (new ProcessException(e));
}
}
}
// upload parts
//------------------------------------------------------------
long thisPartSize;
boolean isLastPart;
for (int part = currentState.getPartETags().size() + 1;
currentState.getFilePosition() < currentState.getContentLength(); part++) {
if (!PutS3Object.this.isScheduled()) {
throw new IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
" part=" + part + " uploadId=" + currentState.getUploadId());
}
thisPartSize = Math.min(currentState.getPartSize(),
(currentState.getContentLength() - currentState.getFilePosition()));
isLastPart = currentState.getContentLength() == currentState.getFilePosition() + thisPartSize;
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(currentState.getUploadId())
.withInputStream(in)
.withPartNumber(part)
.withPartSize(thisPartSize)
.withLastPart(isLastPart);
if (encryptionService != null) {
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
}
try {
UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
currentState.addPartETag(uploadPartResult.getPartETag());
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
try {
persistLocalState(cacheKey, currentState);
} catch (Exception e) {
getLogger().info("Exception saving cache state processing flow file", e);
}
int available = 0;
try {
available = in.available();
} catch (IOException ignored) {
// in case of the last part, the stream is already closed
}
getLogger().info("Success uploading part flowfile={} part={} available={} etag={} uploadId={}",
ffFilename, part, available, uploadPartResult.getETag(), currentState.getUploadId());
} catch (AmazonClientException e) {
getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={}", ffFilename, part, bucket, key, e);
throw (e);
}
}
// complete multipart upload
//------------------------------------------------------------
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
bucket, key, currentState.getUploadId(), currentState.getPartETags());
// No call to an encryption service is needed for a CompleteMultipartUploadRequest.
try {
CompleteMultipartUploadResult completeResult =
s3.completeMultipartUpload(completeRequest);
getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
ffFilename, completeResult.getETag(), currentState.getUploadId());
if (completeResult.getVersionId() != null) {
attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
}
if (completeResult.getETag() != null) {
attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag());
}
if (completeResult.getExpirationTime() != null) {
attributes.put(S3_EXPIRATION_ATTR_KEY,
completeResult.getExpirationTime().toString());
}
if (currentState.getStorageClass() != null) {
attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString());
}
if (userMetadata.size() > 0) {
StringBuilder userMetaBldr = new StringBuilder();
for (String userKey : userMetadata.keySet()) {
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
}
attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
}
attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_MULTIPARTUPLOAD);
} catch (AmazonClientException e) {
getLogger().info("Failure completing upload flowfile={} bucket={} key={}",
ffFilename, bucket, key, e);
throw (e);
}
}
} catch (IOException e) {
getLogger().error("Error during upload of flow files", e);
throw e;
}
final String url = s3.getResourceUrl(bucket, key);
attributes.put("s3.url", url);
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", ff, millis);
try {
removeLocalState(cacheKey);
} catch (IOException e) {
getLogger().info("Error trying to delete key {} from cache:", cacheKey, e);
}
} catch (final IllegalArgumentException | ProcessException | AmazonClientException | IOException e) {
extractExceptionDetails(e, session, flowFile);
if (e.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
getLogger().info(e.getMessage());
session.rollback();
} else {
getLogger().error("Failed to put {} to Amazon S3 due to {}", flowFile, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}