extensions/aws/s3/S3ClientRequestSender.cpp (183 lines of code) (raw):
/**
 * @file S3ClientRequestSender.cpp
 * S3ClientRequestSender class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "S3ClientRequestSender.h"
#include <aws/s3/S3Client.h>
namespace org::apache::nifi::minifi::aws::s3 {
std::optional<Aws::S3::Model::PutObjectResult> S3ClientRequestSender::sendPutObjectRequest(
    const Aws::S3::Model::PutObjectRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config,
    bool use_virtual_addressing) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
  auto outcome = s3_client.PutObject(request);
  if (outcome.IsSuccess()) {
      logger_->log_debug("Added S3 object '%s' to bucket '%s'", request.GetKey(), request.GetBucket());
      return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("PutS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
bool S3ClientRequestSender::sendDeleteObjectRequest(
    const Aws::S3::Model::DeleteObjectRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
  Aws::S3::Model::DeleteObjectOutcome outcome = s3_client.DeleteObject(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("Deleted S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
    return true;
  } else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
    logger_->log_debug("S3 object '%s' was not found in bucket '%s'", request.GetKey(), request.GetBucket());
    return true;
  } else {
    logger_->log_error("DeleteS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
    return false;
  }
}
std::optional<Aws::S3::Model::GetObjectResult> S3ClientRequestSender::sendGetObjectRequest(
    const Aws::S3::Model::GetObjectRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
  auto outcome = s3_client.GetObject(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("Fetched S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
    return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("FetchS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
std::optional<Aws::S3::Model::ListObjectsV2Result> S3ClientRequestSender::sendListObjectsRequest(
    const Aws::S3::Model::ListObjectsV2Request& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
  auto outcome = s3_client.ListObjectsV2(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("ListObjectsV2 successful of bucket '%s'", request.GetBucket());
    return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("ListObjectsV2 failed with the following: '%s'", outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
std::optional<Aws::S3::Model::ListObjectVersionsResult> S3ClientRequestSender::sendListVersionsRequest(
    const Aws::S3::Model::ListObjectVersionsRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
  auto outcome = s3_client.ListObjectVersions(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("ListObjectVersions successful of bucket '%s'", request.GetBucket());
    return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("ListObjectVersions failed with the following: '%s'", outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
std::optional<Aws::S3::Model::GetObjectTaggingResult> S3ClientRequestSender::sendGetObjectTaggingRequest(
    const Aws::S3::Model::GetObjectTaggingRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
  auto outcome = s3_client.GetObjectTagging(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("Got tags for S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
    return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("GetObjectTagging failed with the following: '%s'", outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
std::optional<Aws::S3::Model::HeadObjectResult> S3ClientRequestSender::sendHeadObjectRequest(
    const Aws::S3::Model::HeadObjectRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true);
  auto outcome = s3_client.HeadObject(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("HeadS3Object successful for key '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
    return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("HeadS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
std::optional<Aws::S3::Model::CreateMultipartUploadResult> S3ClientRequestSender::sendCreateMultipartUploadRequest(
    const Aws::S3::Model::CreateMultipartUploadRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config,
    bool use_virtual_addressing) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
  auto outcome = s3_client.CreateMultipartUpload(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("CreateMultipartUpload successful for key '%s' and bucket '%s'", request.GetKey(), request.GetBucket());
    return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("CreateMultipartUpload failed for key '%s' and bucket '%s' with the following: '%s'", request.GetKey(), request.GetBucket(), outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
std::optional<Aws::S3::Model::UploadPartResult> S3ClientRequestSender::sendUploadPartRequest(
    const Aws::S3::Model::UploadPartRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config,
    bool use_virtual_addressing) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
  auto outcome = s3_client.UploadPart(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("UploadPart successful for key '%s' from bucket '%s' with part number %d", request.GetKey(), request.GetBucket(), request.GetPartNumber());
    return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("UploadPart failed for key '%s' from bucket '%s' with part number %d with the following: '%s'",
      request.GetKey(), request.GetBucket(), request.GetPartNumber(), outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
std::optional<Aws::S3::Model::CompleteMultipartUploadResult> S3ClientRequestSender::sendCompleteMultipartUploadRequest(
    const Aws::S3::Model::CompleteMultipartUploadRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config,
    bool use_virtual_addressing) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
  auto outcome = s3_client.CompleteMultipartUpload(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("CompleteMultipartUpload successful for key '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
    return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("CompleteMultipartUpload failed for key '%s' from bucket '%s' with the following: '%s'", request.GetKey(), request.GetBucket(), outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
std::optional<Aws::S3::Model::ListMultipartUploadsResult> S3ClientRequestSender::sendListMultipartUploadsRequest(
    const Aws::S3::Model::ListMultipartUploadsRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config,
    bool use_virtual_addressing) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
  auto outcome = s3_client.ListMultipartUploads(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("ListMultipartUploads successful for bucket '%s'", request.GetBucket());
    return outcome.GetResultWithOwnership();
  } else {
    logger_->log_error("ListMultipartUploads failed for bucket '%s' with the following: '%s'", request.GetBucket(), outcome.GetError().GetMessage());
    return std::nullopt;
  }
}
bool S3ClientRequestSender::sendAbortMultipartUploadRequest(
    const Aws::S3::Model::AbortMultipartUploadRequest& request,
    const Aws::Auth::AWSCredentials& credentials,
    const Aws::Client::ClientConfiguration& client_config,
    bool use_virtual_addressing) {
  Aws::S3::S3Client s3_client(credentials, client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
  auto outcome = s3_client.AbortMultipartUpload(request);
  if (outcome.IsSuccess()) {
    logger_->log_debug("AbortMultipartUpload successful for bucket '%s', key '%s', upload id '%s'", request.GetBucket(), request.GetKey(), request.GetUploadId());
    return true;
  } else {
    logger_->log_error("AbortMultipartUpload failed for bucket '%s', key '%s', upload id '%s' with the following: '%s'",
      request.GetBucket(), request.GetKey(), request.GetUploadId(), outcome.GetError().GetMessage());
    return false;
  }
}
}  // namespace org::apache::nifi::minifi::aws::s3