extensions/aws/processors/PutS3Object.cpp (256 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PutS3Object.h"
#include <string>
#include <memory>
#include "AWSCredentialsService.h"
#include "properties/Properties.h"
#include "utils/StringUtils.h"
#include "utils/MapUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "range/v3/algorithm/contains.hpp"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::aws::processors {
void PutS3Object::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void PutS3Object::fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context) {
const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
bool first_property = true;
for (const auto &prop_key : dynamic_prop_keys) {
std::string prop_value;
if (context->getDynamicProperty(prop_key, prop_value) && !prop_value.empty()) {
logger_->log_debug("PutS3Object: DynamicProperty: [%s] -> [%s]", prop_key, prop_value);
user_metadata_map_.emplace(prop_key, prop_value);
if (first_property) {
user_metadata_ = minifi::utils::StringUtils::join_pack(prop_key, "=", prop_value);
first_property = false;
} else {
user_metadata_ += minifi::utils::StringUtils::join_pack(",", prop_key, "=", prop_value);
}
}
}
logger_->log_debug("PutS3Object: User metadata [%s]", user_metadata_);
}
void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
S3Processor::onSchedule(context, sessionFactory);
if (!context->getProperty(StorageClass, storage_class_)
|| storage_class_.empty()
|| !ranges::contains(STORAGE_CLASSES, storage_class_)) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Storage Class property missing or invalid");
}
logger_->log_debug("PutS3Object: Storage Class [%s]", storage_class_);
if (!context->getProperty(ServerSideEncryption, server_side_encryption_)
|| server_side_encryption_.empty()
|| !ranges::contains(SERVER_SIDE_ENCRYPTIONS, server_side_encryption_)) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Server Side Encryption property missing or invalid");
}
logger_->log_debug("PutS3Object: Server Side Encryption [%s]", server_side_encryption_);
if (auto use_path_style_access = context->getProperty<bool>(UsePathStyleAccess)) {
use_virtual_addressing_ = !*use_path_style_access;
}
if (!context->getProperty(MultipartThreshold, multipart_threshold_) || multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
}
logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
if (!context->getProperty(MultipartPartSize, multipart_size_) || multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
}
logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.name).getMilliseconds();
logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRId64 " ms", int64_t{multipart_upload_ageoff_interval_.count()});
multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.name).getMilliseconds();
logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRId64 " ms", int64_t{multipart_upload_max_age_threshold_.count()});
fillUserMetadata(context);
auto state_manager = context->getStateManager();
if (state_manager == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
s3_wrapper_.initializeMultipartUploadStateStorage(gsl::make_not_null(state_manager));
}
std::string PutS3Object::parseAccessControlList(const std::string &comma_separated_list) {
auto users = minifi::utils::StringUtils::split(comma_separated_list, ",");
for (auto& user : users) {
auto trimmed_user = minifi::utils::StringUtils::trim(user);
if (trimmed_user.find('@') != std::string::npos) {
user = "emailAddress=\"" + trimmed_user + "\"";
} else {
user = "id=" + trimmed_user;
}
}
return minifi::utils::StringUtils::join(", ", users);
}
bool PutS3Object::setCannedAcl(
const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file,
aws::s3::PutObjectRequestParameters &put_s3_request_params) const {
context->getProperty(CannedACL, put_s3_request_params.canned_acl, flow_file);
if (!put_s3_request_params.canned_acl.empty() && !ranges::contains(CANNED_ACLS, put_s3_request_params.canned_acl)) {
logger_->log_error("Canned ACL is invalid!");
return false;
}
logger_->log_debug("PutS3Object: Canned ACL [%s]", put_s3_request_params.canned_acl);
return true;
}
bool PutS3Object::setAccessControl(
const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file,
aws::s3::PutObjectRequestParameters &put_s3_request_params) const {
std::string value;
if (context->getProperty(FullControlUserList, value, flow_file) && !value.empty()) {
put_s3_request_params.fullcontrol_user_list = parseAccessControlList(value);
logger_->log_debug("PutS3Object: Full Control User List [%s]", value);
}
if (context->getProperty(ReadPermissionUserList, value, flow_file) && !value.empty()) {
put_s3_request_params.read_permission_user_list = parseAccessControlList(value);
logger_->log_debug("PutS3Object: Read Permission User List [%s]", value);
}
if (context->getProperty(ReadACLUserList, value, flow_file) && !value.empty()) {
put_s3_request_params.read_acl_user_list = parseAccessControlList(value);
logger_->log_debug("PutS3Object: Read ACL User List [%s]", value);
}
if (context->getProperty(WriteACLUserList, value, flow_file) && !value.empty()) {
put_s3_request_params.write_acl_user_list = parseAccessControlList(value);
logger_->log_debug("PutS3Object: Write ACL User List [%s]", value);
}
return setCannedAcl(context, flow_file, put_s3_request_params);
}
std::optional<aws::s3::PutObjectRequestParameters> PutS3Object::buildPutS3RequestParams(
const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file,
const CommonProperties &common_properties) const {
gsl_Expects(client_config_);
aws::s3::PutObjectRequestParameters params(common_properties.credentials, *client_config_);
params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
params.bucket = common_properties.bucket;
params.user_metadata_map = user_metadata_map_;
params.server_side_encryption = server_side_encryption_;
params.storage_class = storage_class_;
context->getProperty(ObjectKey, params.object_key, flow_file);
if (params.object_key.empty() && (!flow_file->getAttribute("filename", params.object_key) || params.object_key.empty())) {
logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
return std::nullopt;
}
logger_->log_debug("PutS3Object: Object Key [%s]", params.object_key);
context->getProperty(ContentType, params.content_type, flow_file);
logger_->log_debug("PutS3Object: Content Type [%s]", params.content_type);
if (!setAccessControl(context, flow_file, params)) {
return std::nullopt;
}
params.use_virtual_addressing = use_virtual_addressing_;
return params;
}
void PutS3Object::setAttributes(
const std::shared_ptr<core::ProcessSession> &session,
const std::shared_ptr<core::FlowFile> &flow_file,
const aws::s3::PutObjectRequestParameters &put_s3_request_params,
const minifi::aws::s3::PutObjectResult &put_object_result) const {
session->putAttribute(flow_file, "s3.bucket", put_s3_request_params.bucket);
session->putAttribute(flow_file, "s3.key", put_s3_request_params.object_key);
session->putAttribute(flow_file, "s3.contenttype", put_s3_request_params.content_type);
if (!user_metadata_.empty()) {
session->putAttribute(flow_file, "s3.usermetadata", user_metadata_);
}
if (!put_object_result.version.empty()) {
session->putAttribute(flow_file, "s3.version", put_object_result.version);
}
if (!put_object_result.etag.empty()) {
session->putAttribute(flow_file, "s3.etag", put_object_result.etag);
}
if (!put_object_result.expiration.empty()) {
session->putAttribute(flow_file, "s3.expiration", put_object_result.expiration);
}
if (!put_object_result.ssealgorithm.empty()) {
session->putAttribute(flow_file, "s3.sseAlgorithm", put_object_result.ssealgorithm);
}
}
void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
{
std::lock_guard<std::mutex> lock(last_ageoff_mutex_);
const auto now = std::chrono::system_clock::now();
if (now - last_ageoff_time_ < multipart_upload_ageoff_interval_) {
logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
return;
}
last_ageoff_time_ = now;
}
logger_->log_trace("Listing aged off multipart uploads still in progress.");
aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
list_params.bucket = common_properties.bucket;
list_params.age_off_limit = multipart_upload_max_age_threshold_;
list_params.use_virtual_addressing = use_virtual_addressing_;
auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
if (!aged_off_uploads_in_progress) {
logger_->log_error("Listing aged off multipart uploads failed!");
return;
}
logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
size_t aborted = 0;
for (const auto& upload : *aged_off_uploads_in_progress) {
logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s' due to reaching maximum upload age threshold.",
upload.key, upload.upload_id, common_properties.bucket);
aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
abort_params.bucket = common_properties.bucket;
abort_params.key = upload.key;
abort_params.upload_id = upload.upload_id;
abort_params.use_virtual_addressing = use_virtual_addressing_;
if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
logger_->log_error("Failed to abort multipart upload with key '%s' and upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, abort_params.bucket);
continue;
}
++aborted;
}
if (aborted > 0) {
logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
}
s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
}
void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
logger_->log_trace("PutS3Object onTrigger");
std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
context->yield();
return;
}
auto common_properties = getCommonELSupportedProperties(context, flow_file);
if (!common_properties) {
session->transfer(flow_file, Failure);
return;
}
ageOffMultipartUploads(*common_properties);
auto put_s3_request_params = buildPutS3RequestParams(context, flow_file, *common_properties);
if (!put_s3_request_params) {
session->transfer(flow_file, Failure);
return;
}
std::optional<minifi::aws::s3::PutObjectResult> result;
session->read(flow_file, [this, &flow_file, &put_s3_request_params, &result](const std::shared_ptr<io::InputStream>& stream) -> int64_t {
try {
if (flow_file->getSize() <= multipart_threshold_) {
logger_->log_info("Uploading S3 Object '%s' in a single upload", put_s3_request_params->object_key);
result = s3_wrapper_.putObject(*put_s3_request_params, stream, flow_file->getSize());
return gsl::narrow<int64_t>(flow_file->getSize());
} else {
logger_->log_info("S3 Object '%s' passes the multipart threshold, uploading it in multiple parts", put_s3_request_params->object_key);
result = s3_wrapper_.putObjectMultipart(*put_s3_request_params, stream, flow_file->getSize(), multipart_size_);
return gsl::narrow<int64_t>(flow_file->getSize());
}
} catch(const aws::s3::StreamReadException& ex) {
logger_->log_error("Error occurred while uploading to S3: %s", ex.what());
return -1;
}
});
if (!result.has_value()) {
logger_->log_error("Failed to upload S3 object to bucket '%s'", put_s3_request_params->bucket);
session->transfer(flow_file, Failure);
} else {
setAttributes(session, flow_file, *put_s3_request_params, *result);
logger_->log_debug("Successfully uploaded S3 object '%s' to bucket '%s'", put_s3_request_params->object_key, put_s3_request_params->bucket);
session->transfer(flow_file, Success);
}
}
REGISTER_RESOURCE(PutS3Object, Processor);
} // namespace org::apache::nifi::minifi::aws::processors