extensions/aws/s3/S3Wrapper.h (297 lines of code) (raw):
/**
* @file S3Wrapper.h
* S3Wrapper class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <map>
#include <memory>
#include <optional>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "aws/s3/model/StorageClass.h"
#include "aws/s3/model/ServerSideEncryption.h"
#include "aws/s3/model/ObjectCannedACL.h"
#include "aws/s3/model/ChecksumAlgorithm.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerFactory.h"
#include "utils/AWSInitializer.h"
#include "utils/ConfigurationUtils.h"
#include "utils/OptionalUtils.h"
#include "utils/StringUtils.h"
#include "utils/ListingStateManager.h"
#include "utils/gsl.h"
#include "S3RequestSender.h"
#include "Exception.h"
#include "MultipartUploadStateStorage.h"
#include "range/v3/algorithm/find.hpp"
#include "utils/Literals.h"
#include "io/InputStream.h"
#include "io/OutputStream.h"
namespace org::apache::nifi::minifi::aws::s3 {
inline constexpr std::array<std::pair<std::string_view, Aws::S3::Model::StorageClass>, 11> STORAGE_CLASS_MAP {{
{"Standard", Aws::S3::Model::StorageClass::STANDARD},
{"ReducedRedundancy", Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY},
{"StandardIA", Aws::S3::Model::StorageClass::STANDARD_IA},
{"OnezoneIA", Aws::S3::Model::StorageClass::ONEZONE_IA},
{"IntelligentTiering", Aws::S3::Model::StorageClass::INTELLIGENT_TIERING},
{"Glacier", Aws::S3::Model::StorageClass::GLACIER},
{"DeepArchive", Aws::S3::Model::StorageClass::DEEP_ARCHIVE},
{"Outposts", Aws::S3::Model::StorageClass::OUTPOSTS},
{"GlacierIR", Aws::S3::Model::StorageClass::GLACIER_IR},
{"Snow", Aws::S3::Model::StorageClass::SNOW},
{"ExpressOneZone", Aws::S3::Model::StorageClass::EXPRESS_ONEZONE}
}};
inline constexpr std::array<std::pair<Aws::S3::Model::ObjectStorageClass, std::string_view>, 11> OBJECT_STORAGE_CLASS_MAP {{
{Aws::S3::Model::ObjectStorageClass::STANDARD, "Standard"},
{Aws::S3::Model::ObjectStorageClass::REDUCED_REDUNDANCY, "ReducedRedundancy"},
{Aws::S3::Model::ObjectStorageClass::GLACIER, "Glacier"},
{Aws::S3::Model::ObjectStorageClass::STANDARD_IA, "StandardIA"},
{Aws::S3::Model::ObjectStorageClass::ONEZONE_IA, "OnezoneIA"},
{Aws::S3::Model::ObjectStorageClass::INTELLIGENT_TIERING, "IntelligentTiering"},
{Aws::S3::Model::ObjectStorageClass::DEEP_ARCHIVE, "DeepArchive"},
{Aws::S3::Model::ObjectStorageClass::OUTPOSTS, "Outposts"},
{Aws::S3::Model::ObjectStorageClass::GLACIER_IR, "GlacierIR"},
{Aws::S3::Model::ObjectStorageClass::SNOW, "Snow"},
{Aws::S3::Model::ObjectStorageClass::EXPRESS_ONEZONE, "ExpressOneZone"}
}};
inline constexpr std::array<std::pair<Aws::S3::Model::ObjectVersionStorageClass, std::string_view>, 1> VERSION_STORAGE_CLASS_MAP {{
{Aws::S3::Model::ObjectVersionStorageClass::STANDARD, "Standard"}
}};
inline constexpr std::array<std::pair<std::string_view, Aws::S3::Model::ServerSideEncryption>, 4> SERVER_SIDE_ENCRYPTION_MAP {{
{"None", Aws::S3::Model::ServerSideEncryption::NOT_SET},
{"AES256", Aws::S3::Model::ServerSideEncryption::AES256},
{"aws_kms", Aws::S3::Model::ServerSideEncryption::aws_kms},
{"aws_kms_dsse", Aws::S3::Model::ServerSideEncryption::aws_kms_dsse}
}};
inline constexpr std::array<std::pair<std::string_view, Aws::S3::Model::ObjectCannedACL>, 7> CANNED_ACL_MAP {{
{"BucketOwnerFullControl", Aws::S3::Model::ObjectCannedACL::bucket_owner_full_control},
{"BucketOwnerRead", Aws::S3::Model::ObjectCannedACL::bucket_owner_read},
{"AuthenticatedRead", Aws::S3::Model::ObjectCannedACL::authenticated_read},
{"PublicReadWrite", Aws::S3::Model::ObjectCannedACL::public_read_write},
{"PublicRead", Aws::S3::Model::ObjectCannedACL::public_read},
{"Private", Aws::S3::Model::ObjectCannedACL::private_},
{"AwsExecRead", Aws::S3::Model::ObjectCannedACL::aws_exec_read},
}};
inline constexpr std::array<std::pair<std::string_view, Aws::S3::Model::ChecksumAlgorithm>, 5> CHECKSUM_ALGORITHM_MAP {{
{"CRC32", Aws::S3::Model::ChecksumAlgorithm::CRC32},
{"CRC32C", Aws::S3::Model::ChecksumAlgorithm::CRC32C},
{"SHA1", Aws::S3::Model::ChecksumAlgorithm::SHA1},
{"SHA256", Aws::S3::Model::ChecksumAlgorithm::SHA256},
{"CRC64NVME", Aws::S3::Model::ChecksumAlgorithm::CRC64NVME}
}};
struct Expiration {
std::string expiration_time;
std::string expiration_time_rule_id;
};
struct PutObjectResult {
std::string version;
std::string etag;
std::string expiration;
std::string ssealgorithm;
};
struct RequestParameters {
RequestParameters(Aws::Auth::AWSCredentials creds, Aws::Client::ClientConfiguration config)
: credentials(std::move(creds)),
client_config(std::move(config)) {}
Aws::Auth::AWSCredentials credentials;
Aws::Client::ClientConfiguration client_config;
void setClientConfig(const aws::s3::ProxyOptions& proxy, const std::string& endpoint_override_url) {
client_config.proxyHost = proxy.host;
client_config.proxyPort = proxy.port;
client_config.proxyUserName = proxy.username;
client_config.proxyPassword = proxy.password;
client_config.endpointOverride = endpoint_override_url;
}
};
struct PutObjectRequestParameters : public RequestParameters {
PutObjectRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::string object_key;
std::string storage_class;
std::string server_side_encryption;
std::string content_type;
std::map<std::string, std::string> user_metadata_map;
std::string fullcontrol_user_list;
std::string read_permission_user_list;
std::string read_acl_user_list;
std::string write_acl_user_list;
std::string canned_acl;
Aws::S3::Model::ChecksumAlgorithm checksum_algorithm;
bool use_virtual_addressing = true;
};
struct DeleteObjectRequestParameters : public RequestParameters {
DeleteObjectRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::string object_key;
std::string version;
};
struct GetObjectRequestParameters : public RequestParameters {
GetObjectRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::string object_key;
std::string version;
bool requester_pays = false;
};
struct HeadObjectResult {
std::filesystem::path path;
std::filesystem::path absolute_path;
std::filesystem::path filename;
std::string mime_type;
std::string etag;
Expiration expiration;
std::string ssealgorithm;
std::string version;
std::map<std::string, std::string> user_metadata_map;
void setFilePaths(const std::string& key);
};
struct GetObjectResult : public HeadObjectResult {
int64_t write_size = 0;
};
struct ListRequestParameters : public RequestParameters {
ListRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::string delimiter;
std::string prefix;
bool use_versions = false;
uint64_t min_object_age = 0;
};
struct ListedObjectAttributes : public minifi::utils::ListedObject {
[[nodiscard]] std::chrono::time_point<std::chrono::system_clock> getLastModified() const override {
return last_modified;
}
[[nodiscard]] std::string getKey() const override {
return filename;
}
std::string filename;
std::string etag;
bool is_latest = false;
std::chrono::time_point<std::chrono::system_clock> last_modified;
int64_t length = 0;
std::string store_class;
std::string version;
};
using HeadObjectRequestParameters = GetObjectRequestParameters;
using GetObjectTagsParameters = DeleteObjectRequestParameters;
struct ListMultipartUploadsRequestParameters : public RequestParameters {
ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::optional<std::chrono::milliseconds> age_off_limit; // if set, only list the aged off uploads
bool use_virtual_addressing = true;
};
struct MultipartUpload {
std::string key;
std::string upload_id;
};
struct AbortMultipartUploadRequestParameters : public RequestParameters {
AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::string key;
std::string upload_id;
bool use_virtual_addressing = true;
};
class StreamReadException : public Exception {
public:
explicit StreamReadException(const std::string& error) : Exception(GENERAL_EXCEPTION, error) {}
};
class S3Wrapper {
public:
static constexpr auto BUFFER_SIZE = minifi::utils::configuration::DEFAULT_BUFFER_SIZE;
S3Wrapper();
explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size);
std::optional<PutObjectResult> putObjectMultipart(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t multipart_size);
bool deleteObject(const DeleteObjectRequestParameters& params);
std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::OutputStream& out_body);
std::optional<std::vector<ListedObjectAttributes>> listBucket(const ListRequestParameters& params);
std::optional<std::map<std::string, std::string>> getObjectTags(const GetObjectTagsParameters& params);
std::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
std::optional<std::vector<MultipartUpload>> listMultipartUploads(const ListMultipartUploadsRequestParameters& params);
bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& params);
void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds multipart_upload_max_age_threshold);
void initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*> state_manager);
virtual ~S3Wrapper() = default;
private:
struct UploadPartsResult {
std::string upload_id;
std::vector<std::string> part_etags;
};
static Expiration getExpiration(const std::string& expiration);
template<typename RequestType>
void setCannedAcl(RequestType& request, const std::string& canned_acl) const {
if (canned_acl.empty()) return;
const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto& kv) { return kv.first; });
if (it == CANNED_ACL_MAP.end()) return;
logger_->log_debug("Setting AWS canned ACL [{}]", canned_acl);
request.SetACL(it->second);
}
template<typename RequestType>
RequestType createPutObjectRequest(const PutObjectRequestParameters& put_object_params) {
auto request = RequestType{}
.WithBucket(put_object_params.bucket)
.WithKey(put_object_params.object_key)
.WithStorageClass(minifi::utils::at(STORAGE_CLASS_MAP, put_object_params.storage_class))
.WithChecksumAlgorithm(put_object_params.checksum_algorithm);
if (!put_object_params.server_side_encryption.empty() && put_object_params.server_side_encryption != "None") {
request.SetServerSideEncryption(minifi::utils::at(SERVER_SIDE_ENCRYPTION_MAP, put_object_params.server_side_encryption));
}
if (!put_object_params.content_type.empty()) {
request.SetContentType(put_object_params.content_type);
}
if (!put_object_params.user_metadata_map.empty()) {
request.SetMetadata(put_object_params.user_metadata_map);
}
if (!put_object_params.fullcontrol_user_list.empty()) {
request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
}
if (!put_object_params.read_permission_user_list.empty()) {
request.SetGrantRead(put_object_params.read_permission_user_list);
}
if (!put_object_params.read_acl_user_list.empty()) {
request.SetGrantReadACP(put_object_params.read_acl_user_list);
}
if (!put_object_params.write_acl_user_list.empty()) {
request.SetGrantWriteACP(put_object_params.write_acl_user_list);
}
setCannedAcl<RequestType>(request, put_object_params.canned_acl);
return request;
}
template<typename ResultType>
PutObjectResult createPutObjectResult(const ResultType& upload_result) {
return {
.version = upload_result.GetVersionId(),
// Etags are returned by AWS in quoted form that should be removed
.etag = minifi::utils::string::removeFramingCharacters(upload_result.GetETag(), '"'),
// GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
// s3.expiration only needs the date member of this pair
.expiration = getExpiration(upload_result.GetExpiration()).expiration_time,
.ssealgorithm = getEncryptionString(upload_result.GetServerSideEncryption())
};
}
static int64_t writeFetchedBody(Aws::IOStream& source, int64_t data_size, io::OutputStream& output);
static std::string getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption);
static std::shared_ptr<Aws::StringStream> readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out);
std::optional<std::vector<ListedObjectAttributes>> listVersions(const ListRequestParameters& params);
std::optional<std::vector<ListedObjectAttributes>> listObjects(const ListRequestParameters& params);
void addListResults(const Aws::Vector<Aws::S3::Model::ObjectVersion>& content, uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects);
void addListResults(const Aws::Vector<Aws::S3::Model::Object>& content, uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects);
void addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> age_off_limit,
std::vector<MultipartUpload>& filtered_uploads);
std::optional<UploadPartsResult> uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, MultipartUploadState upload_state);
std::optional<Aws::S3::Model::CompleteMultipartUploadResult> completeMultipartUpload(const PutObjectRequestParameters& put_object_params, const UploadPartsResult& upload_parts_result);
bool multipartUploadExistsInS3(const PutObjectRequestParameters& put_object_params);
std::optional<MultipartUploadState> getMultipartUploadState(const PutObjectRequestParameters& put_object_params);
template<typename ListRequest>
ListRequest createListRequest(const ListRequestParameters& params);
template<typename FetchObjectRequest>
FetchObjectRequest createFetchObjectRequest(const GetObjectRequestParameters& get_object_params);
template<typename AwsResult, typename FetchObjectResult>
FetchObjectResult fillFetchObjectResult(const GetObjectRequestParameters& get_object_params, const AwsResult& fetch_object_result);
const utils::AWSInitializer& AWS_INITIALIZER = utils::AWSInitializer::get();
std::shared_ptr<minifi::core::logging::Logger> logger_{minifi::core::logging::LoggerFactory<S3Wrapper>::getLogger()};
std::unique_ptr<S3RequestSender> request_sender_;
uint64_t last_bucket_list_timestamp_ = 0;
std::unique_ptr<MultipartUploadStateStorage> multipart_upload_storage_;
};
} // namespace org::apache::nifi::minifi::aws::s3