extensions/aws/processors/PutS3Object.cpp (256 lines of code) (raw):
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "PutS3Object.h"
#include <string>
#include <memory>
#include "AWSCredentialsService.h"
#include "properties/Properties.h"
#include "utils/StringUtils.h"
#include "utils/MapUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "range/v3/algorithm/contains.hpp"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::aws::processors {
void PutS3Object::initialize() {
  setSupportedProperties(Properties);
  setSupportedRelationships(Relationships);
}
void PutS3Object::fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context) {
  const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
  bool first_property = true;
  for (const auto &prop_key : dynamic_prop_keys) {
    std::string prop_value;
    if (context->getDynamicProperty(prop_key, prop_value) && !prop_value.empty()) {
      logger_->log_debug("PutS3Object: DynamicProperty: [%s] -> [%s]", prop_key, prop_value);
      user_metadata_map_.emplace(prop_key, prop_value);
      if (first_property) {
        user_metadata_ = minifi::utils::StringUtils::join_pack(prop_key, "=", prop_value);
        first_property = false;
      } else {
        user_metadata_ += minifi::utils::StringUtils::join_pack(",", prop_key, "=", prop_value);
      }
    }
  }
  logger_->log_debug("PutS3Object: User metadata [%s]", user_metadata_);
}
void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
  S3Processor::onSchedule(context, sessionFactory);
  if (!context->getProperty(StorageClass, storage_class_)
      || storage_class_.empty()
      || !ranges::contains(STORAGE_CLASSES, storage_class_)) {
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Storage Class property missing or invalid");
  }
  logger_->log_debug("PutS3Object: Storage Class [%s]", storage_class_);
  if (!context->getProperty(ServerSideEncryption, server_side_encryption_)
      || server_side_encryption_.empty()
      || !ranges::contains(SERVER_SIDE_ENCRYPTIONS, server_side_encryption_)) {
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Server Side Encryption property missing or invalid");
  }
  logger_->log_debug("PutS3Object: Server Side Encryption [%s]", server_side_encryption_);
  if (auto use_path_style_access = context->getProperty<bool>(UsePathStyleAccess)) {
    use_virtual_addressing_ = !*use_path_style_access;
  }
  if (!context->getProperty(MultipartThreshold, multipart_threshold_) || multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
  }
  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
  if (!context->getProperty(MultipartPartSize, multipart_size_) || multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
  }
  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.name).getMilliseconds();
  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRId64 " ms", int64_t{multipart_upload_ageoff_interval_.count()});
  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.name).getMilliseconds();
  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRId64 " ms", int64_t{multipart_upload_max_age_threshold_.count()});
  fillUserMetadata(context);
  auto state_manager = context->getStateManager();
  if (state_manager == nullptr) {
    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
  }
  s3_wrapper_.initializeMultipartUploadStateStorage(gsl::make_not_null(state_manager));
}
std::string PutS3Object::parseAccessControlList(const std::string &comma_separated_list) {
  auto users = minifi::utils::StringUtils::split(comma_separated_list, ",");
  for (auto& user : users) {
    auto trimmed_user = minifi::utils::StringUtils::trim(user);
    if (trimmed_user.find('@') != std::string::npos) {
      user = "emailAddress=\"" + trimmed_user + "\"";
    } else {
      user = "id=" + trimmed_user;
    }
  }
  return minifi::utils::StringUtils::join(", ", users);
}
bool PutS3Object::setCannedAcl(
    const std::shared_ptr<core::ProcessContext> &context,
    const std::shared_ptr<core::FlowFile> &flow_file,
    aws::s3::PutObjectRequestParameters &put_s3_request_params) const {
  context->getProperty(CannedACL, put_s3_request_params.canned_acl, flow_file);
  if (!put_s3_request_params.canned_acl.empty() && !ranges::contains(CANNED_ACLS, put_s3_request_params.canned_acl)) {
    logger_->log_error("Canned ACL is invalid!");
    return false;
  }
  logger_->log_debug("PutS3Object: Canned ACL [%s]", put_s3_request_params.canned_acl);
  return true;
}
bool PutS3Object::setAccessControl(
      const std::shared_ptr<core::ProcessContext> &context,
      const std::shared_ptr<core::FlowFile> &flow_file,
      aws::s3::PutObjectRequestParameters &put_s3_request_params) const {
  std::string value;
  if (context->getProperty(FullControlUserList, value, flow_file) && !value.empty()) {
    put_s3_request_params.fullcontrol_user_list = parseAccessControlList(value);
    logger_->log_debug("PutS3Object: Full Control User List [%s]", value);
  }
  if (context->getProperty(ReadPermissionUserList, value, flow_file) && !value.empty()) {
    put_s3_request_params.read_permission_user_list = parseAccessControlList(value);
    logger_->log_debug("PutS3Object: Read Permission User List [%s]", value);
  }
  if (context->getProperty(ReadACLUserList, value, flow_file) && !value.empty()) {
    put_s3_request_params.read_acl_user_list = parseAccessControlList(value);
    logger_->log_debug("PutS3Object: Read ACL User List [%s]", value);
  }
  if (context->getProperty(WriteACLUserList, value, flow_file) && !value.empty()) {
    put_s3_request_params.write_acl_user_list = parseAccessControlList(value);
    logger_->log_debug("PutS3Object: Write ACL User List [%s]", value);
  }
  return setCannedAcl(context, flow_file, put_s3_request_params);
}
std::optional<aws::s3::PutObjectRequestParameters> PutS3Object::buildPutS3RequestParams(
    const std::shared_ptr<core::ProcessContext> &context,
    const std::shared_ptr<core::FlowFile> &flow_file,
    const CommonProperties &common_properties) const {
  gsl_Expects(client_config_);
  aws::s3::PutObjectRequestParameters params(common_properties.credentials, *client_config_);
  params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
  params.bucket = common_properties.bucket;
  params.user_metadata_map = user_metadata_map_;
  params.server_side_encryption = server_side_encryption_;
  params.storage_class = storage_class_;
  context->getProperty(ObjectKey, params.object_key, flow_file);
  if (params.object_key.empty() && (!flow_file->getAttribute("filename", params.object_key) || params.object_key.empty())) {
    logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
    return std::nullopt;
  }
  logger_->log_debug("PutS3Object: Object Key [%s]", params.object_key);
  context->getProperty(ContentType, params.content_type, flow_file);
  logger_->log_debug("PutS3Object: Content Type [%s]", params.content_type);
  if (!setAccessControl(context, flow_file, params)) {
    return std::nullopt;
  }
  params.use_virtual_addressing = use_virtual_addressing_;
  return params;
}
void PutS3Object::setAttributes(
    const std::shared_ptr<core::ProcessSession> &session,
    const std::shared_ptr<core::FlowFile> &flow_file,
    const aws::s3::PutObjectRequestParameters &put_s3_request_params,
    const minifi::aws::s3::PutObjectResult &put_object_result) const {
  session->putAttribute(flow_file, "s3.bucket", put_s3_request_params.bucket);
  session->putAttribute(flow_file, "s3.key", put_s3_request_params.object_key);
  session->putAttribute(flow_file, "s3.contenttype", put_s3_request_params.content_type);
  if (!user_metadata_.empty()) {
    session->putAttribute(flow_file, "s3.usermetadata", user_metadata_);
  }
  if (!put_object_result.version.empty()) {
    session->putAttribute(flow_file, "s3.version", put_object_result.version);
  }
  if (!put_object_result.etag.empty()) {
    session->putAttribute(flow_file, "s3.etag", put_object_result.etag);
  }
  if (!put_object_result.expiration.empty()) {
    session->putAttribute(flow_file, "s3.expiration", put_object_result.expiration);
  }
  if (!put_object_result.ssealgorithm.empty()) {
    session->putAttribute(flow_file, "s3.sseAlgorithm", put_object_result.ssealgorithm);
  }
}
void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
  {
    std::lock_guard<std::mutex> lock(last_ageoff_mutex_);
    const auto now = std::chrono::system_clock::now();
    if (now - last_ageoff_time_ < multipart_upload_ageoff_interval_) {
      logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
      return;
    }
    last_ageoff_time_ = now;
  }
  logger_->log_trace("Listing aged off multipart uploads still in progress.");
  aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
  list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
  list_params.bucket = common_properties.bucket;
  list_params.age_off_limit = multipart_upload_max_age_threshold_;
  list_params.use_virtual_addressing = use_virtual_addressing_;
  auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
  if (!aged_off_uploads_in_progress) {
    logger_->log_error("Listing aged off multipart uploads failed!");
    return;
  }
  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
  size_t aborted = 0;
  for (const auto& upload : *aged_off_uploads_in_progress) {
    logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s' due to reaching maximum upload age threshold.",
      upload.key, upload.upload_id, common_properties.bucket);
    aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
    abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
    abort_params.bucket = common_properties.bucket;
    abort_params.key = upload.key;
    abort_params.upload_id = upload.upload_id;
    abort_params.use_virtual_addressing = use_virtual_addressing_;
    if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
       logger_->log_error("Failed to abort multipart upload with key '%s' and upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, abort_params.bucket);
       continue;
    }
    ++aborted;
  }
  if (aborted > 0) {
    logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
  }
  s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
}
void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
  logger_->log_trace("PutS3Object onTrigger");
  std::shared_ptr<core::FlowFile> flow_file = session->get();
  if (!flow_file) {
    context->yield();
    return;
  }
  auto common_properties = getCommonELSupportedProperties(context, flow_file);
  if (!common_properties) {
    session->transfer(flow_file, Failure);
    return;
  }
  ageOffMultipartUploads(*common_properties);
  auto put_s3_request_params = buildPutS3RequestParams(context, flow_file, *common_properties);
  if (!put_s3_request_params) {
    session->transfer(flow_file, Failure);
    return;
  }
  std::optional<minifi::aws::s3::PutObjectResult> result;
  session->read(flow_file, [this, &flow_file, &put_s3_request_params, &result](const std::shared_ptr<io::InputStream>& stream) -> int64_t {
    try {
      if (flow_file->getSize() <= multipart_threshold_) {
        logger_->log_info("Uploading S3 Object '%s' in a single upload", put_s3_request_params->object_key);
        result = s3_wrapper_.putObject(*put_s3_request_params, stream, flow_file->getSize());
        return gsl::narrow<int64_t>(flow_file->getSize());
      } else {
        logger_->log_info("S3 Object '%s' passes the multipart threshold, uploading it in multiple parts", put_s3_request_params->object_key);
        result = s3_wrapper_.putObjectMultipart(*put_s3_request_params, stream, flow_file->getSize(), multipart_size_);
        return gsl::narrow<int64_t>(flow_file->getSize());
      }
    } catch(const aws::s3::StreamReadException& ex) {
      logger_->log_error("Error occurred while uploading to S3: %s", ex.what());
      return -1;
    }
  });
  if (!result.has_value()) {
    logger_->log_error("Failed to upload S3 object to bucket '%s'", put_s3_request_params->bucket);
    session->transfer(flow_file, Failure);
  } else {
    setAttributes(session, flow_file, *put_s3_request_params, *result);
    logger_->log_debug("Successfully uploaded S3 object '%s' to bucket '%s'", put_s3_request_params->object_key, put_s3_request_params->bucket);
    session->transfer(flow_file, Success);
  }
}
REGISTER_RESOURCE(PutS3Object, Processor);
}  // namespace org::apache::nifi::minifi::aws::processors