extensions/aws/s3/S3ClientRequestSender.cpp (183 lines of code) (raw):
/**
* @file S3ClientRequestSender.cpp
* S3ClientRequestSender class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "S3ClientRequestSender.h"
#include <aws/s3/S3Client.h>
namespace org::apache::nifi::minifi::aws::s3 {
std::optional<Aws::S3::Model::PutObjectResult> S3ClientRequestSender::sendPutObjectRequest(
const Aws::S3::Model::PutObjectRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config,
bool use_virtual_addressing) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
auto outcome = s3_client.PutObject(request);
if (outcome.IsSuccess()) {
logger_->log_debug("Added S3 object '%s' to bucket '%s'", request.GetKey(), request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("PutS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
return std::nullopt;
}
}
bool S3ClientRequestSender::sendDeleteObjectRequest(
const Aws::S3::Model::DeleteObjectRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
Aws::S3::Model::DeleteObjectOutcome outcome = s3_client.DeleteObject(request);
if (outcome.IsSuccess()) {
logger_->log_debug("Deleted S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
return true;
} else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
logger_->log_debug("S3 object '%s' was not found in bucket '%s'", request.GetKey(), request.GetBucket());
return true;
} else {
logger_->log_error("DeleteS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
return false;
}
}
std::optional<Aws::S3::Model::GetObjectResult> S3ClientRequestSender::sendGetObjectRequest(
const Aws::S3::Model::GetObjectRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
auto outcome = s3_client.GetObject(request);
if (outcome.IsSuccess()) {
logger_->log_debug("Fetched S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("FetchS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
return std::nullopt;
}
}
std::optional<Aws::S3::Model::ListObjectsV2Result> S3ClientRequestSender::sendListObjectsRequest(
const Aws::S3::Model::ListObjectsV2Request& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
auto outcome = s3_client.ListObjectsV2(request);
if (outcome.IsSuccess()) {
logger_->log_debug("ListObjectsV2 successful of bucket '%s'", request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("ListObjectsV2 failed with the following: '%s'", outcome.GetError().GetMessage());
return std::nullopt;
}
}
std::optional<Aws::S3::Model::ListObjectVersionsResult> S3ClientRequestSender::sendListVersionsRequest(
const Aws::S3::Model::ListObjectVersionsRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
auto outcome = s3_client.ListObjectVersions(request);
if (outcome.IsSuccess()) {
logger_->log_debug("ListObjectVersions successful of bucket '%s'", request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("ListObjectVersions failed with the following: '%s'", outcome.GetError().GetMessage());
return std::nullopt;
}
}
std::optional<Aws::S3::Model::GetObjectTaggingResult> S3ClientRequestSender::sendGetObjectTaggingRequest(
const Aws::S3::Model::GetObjectTaggingRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
auto outcome = s3_client.GetObjectTagging(request);
if (outcome.IsSuccess()) {
logger_->log_debug("Got tags for S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("GetObjectTagging failed with the following: '%s'", outcome.GetError().GetMessage());
return std::nullopt;
}
}
std::optional<Aws::S3::Model::HeadObjectResult> S3ClientRequestSender::sendHeadObjectRequest(
const Aws::S3::Model::HeadObjectRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
auto outcome = s3_client.HeadObject(request);
if (outcome.IsSuccess()) {
logger_->log_debug("HeadS3Object successful for key '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("HeadS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
return std::nullopt;
}
}
std::optional<Aws::S3::Model::CreateMultipartUploadResult> S3ClientRequestSender::sendCreateMultipartUploadRequest(
const Aws::S3::Model::CreateMultipartUploadRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config,
bool use_virtual_addressing) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
auto outcome = s3_client.CreateMultipartUpload(request);
if (outcome.IsSuccess()) {
logger_->log_debug("CreateMultipartUpload successful for key '%s' and bucket '%s'", request.GetKey(), request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("CreateMultipartUpload failed for key '%s' and bucket '%s' with the following: '%s'", request.GetKey(), request.GetBucket(), outcome.GetError().GetMessage());
return std::nullopt;
}
}
std::optional<Aws::S3::Model::UploadPartResult> S3ClientRequestSender::sendUploadPartRequest(
const Aws::S3::Model::UploadPartRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config,
bool use_virtual_addressing) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
auto outcome = s3_client.UploadPart(request);
if (outcome.IsSuccess()) {
logger_->log_debug("UploadPart successful for key '%s' from bucket '%s' with part number %d", request.GetKey(), request.GetBucket(), request.GetPartNumber());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("UploadPart failed for key '%s' from bucket '%s' with part number %d with the following: '%s'",
request.GetKey(), request.GetBucket(), request.GetPartNumber(), outcome.GetError().GetMessage());
return std::nullopt;
}
}
std::optional<Aws::S3::Model::CompleteMultipartUploadResult> S3ClientRequestSender::sendCompleteMultipartUploadRequest(
const Aws::S3::Model::CompleteMultipartUploadRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config,
bool use_virtual_addressing) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
auto outcome = s3_client.CompleteMultipartUpload(request);
if (outcome.IsSuccess()) {
logger_->log_debug("CompleteMultipartUpload successful for key '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("CompleteMultipartUpload failed for key '%s' from bucket '%s' with the following: '%s'", request.GetKey(), request.GetBucket(), outcome.GetError().GetMessage());
return std::nullopt;
}
}
std::optional<Aws::S3::Model::ListMultipartUploadsResult> S3ClientRequestSender::sendListMultipartUploadsRequest(
const Aws::S3::Model::ListMultipartUploadsRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config,
bool use_virtual_addressing) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
auto outcome = s3_client.ListMultipartUploads(request);
if (outcome.IsSuccess()) {
logger_->log_debug("ListMultipartUploads successful for bucket '%s'", request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
logger_->log_error("ListMultipartUploads failed for bucket '%s' with the following: '%s'", request.GetBucket(), outcome.GetError().GetMessage());
return std::nullopt;
}
}
bool S3ClientRequestSender::sendAbortMultipartUploadRequest(
const Aws::S3::Model::AbortMultipartUploadRequest& request,
const Aws::Auth::AWSCredentials& credentials,
const Aws::Client::ClientConfiguration& client_config,
bool use_virtual_addressing) {
Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
auto outcome = s3_client.AbortMultipartUpload(request);
if (outcome.IsSuccess()) {
logger_->log_debug("AbortMultipartUpload successful for bucket '%s', key '%s', upload id '%s'", request.GetBucket(), request.GetKey(), request.GetUploadId());
return true;
} else {
logger_->log_error("AbortMultipartUpload failed for bucket '%s', key '%s', upload id '%s' with the following: '%s'",
request.GetBucket(), request.GetKey(), request.GetUploadId(), outcome.GetError().GetMessage());
return false;
}
}
} // namespace org::apache::nifi::minifi::aws::s3