cloud/src/recycler/s3_accessor.cpp (423 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "recycler/s3_accessor.h"
#include <aws/core/auth/AWSAuthSigner.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
#include <aws/s3/S3Client.h>
#include <aws/sts/STSClient.h>
#include <bvar/reducer.h>
#include <gen_cpp/cloud.pb.h>
#include <algorithm>
#ifdef USE_AZURE
#include <azure/storage/blobs/blob_container_client.hpp>
#include <azure/storage/common/storage_credential.hpp>
#endif
#include <execution>
#include <memory>
#include <utility>
#include "common/config.h"
#include "common/encryption_util.h"
#include "common/logging.h"
#include "common/simple_thread_pool.h"
#include "common/string_util.h"
#include "common/util.h"
#include "cpp/aws_logger.h"
#include "cpp/obj_retry_strategy.h"
#include "cpp/s3_rate_limiter.h"
#include "cpp/sync_point.h"
#ifdef USE_AZURE
#include "recycler/azure_obj_client.h"
#endif
#include "recycler/obj_storage_client.h"
#include "recycler/s3_obj_client.h"
#include "recycler/storage_vault_accessor.h"
namespace doris::cloud {
namespace s3_bvar {
bvar::LatencyRecorder s3_get_latency("s3_get");
bvar::LatencyRecorder s3_put_latency("s3_put");
bvar::LatencyRecorder s3_delete_object_latency("s3_delete_object");
bvar::LatencyRecorder s3_delete_objects_latency("s3_delete_objects");
bvar::LatencyRecorder s3_head_latency("s3_head");
bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload");
bvar::LatencyRecorder s3_list_latency("s3_list");
bvar::LatencyRecorder s3_list_object_versions_latency("s3_list_object_versions");
bvar::LatencyRecorder s3_get_bucket_version_latency("s3_get_bucket_version");
bvar::LatencyRecorder s3_copy_object_latency("s3_copy_object");
}; // namespace s3_bvar
bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns");
bvar::Adder<int64_t> get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_num");
bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns");
bvar::Adder<int64_t> put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num");
AccessorRateLimiter::AccessorRateLimiter()
: _rate_limiters(
{std::make_unique<S3RateLimiterHolder>(
config::s3_get_token_per_second, config::s3_get_bucket_tokens,
config::s3_get_token_limit,
metric_func_factory(get_rate_limit_ns, get_rate_limit_exceed_req_num)),
std::make_unique<S3RateLimiterHolder>(
config::s3_put_token_per_second, config::s3_put_bucket_tokens,
config::s3_put_token_limit,
metric_func_factory(put_rate_limit_ns,
put_rate_limit_exceed_req_num))}) {}
S3RateLimiterHolder* AccessorRateLimiter::rate_limiter(S3RateLimitType type) {
CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << to_string(type);
return _rate_limiters[static_cast<size_t>(type)].get();
}
AccessorRateLimiter& AccessorRateLimiter::instance() {
static AccessorRateLimiter instance;
return instance;
}
int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit) {
if (type == S3RateLimitType::UNKNOWN) {
return -1;
}
if (type == S3RateLimitType::GET) {
max_speed = (max_speed == 0) ? config::s3_get_token_per_second : max_speed;
max_burst = (max_burst == 0) ? config::s3_get_bucket_tokens : max_burst;
limit = (limit == 0) ? config::s3_get_token_limit : limit;
} else {
max_speed = (max_speed == 0) ? config::s3_put_token_per_second : max_speed;
max_burst = (max_burst == 0) ? config::s3_put_bucket_tokens : max_burst;
limit = (limit == 0) ? config::s3_put_token_limit : limit;
}
return AccessorRateLimiter::instance().rate_limiter(type)->reset(max_speed, max_burst, limit);
}
class S3Environment {
public:
S3Environment() {
aws_options_ = Aws::SDKOptions {};
auto logLevel = static_cast<Aws::Utils::Logging::LogLevel>(config::aws_log_level);
aws_options_.loggingOptions.logLevel = logLevel;
aws_options_.loggingOptions.logger_create_fn = [logLevel] {
return std::make_shared<DorisAWSLogger>(logLevel);
};
Aws::InitAPI(aws_options_);
}
~S3Environment() { Aws::ShutdownAPI(aws_options_); }
private:
Aws::SDKOptions aws_options_;
};
class S3ListIterator final : public ListIterator {
public:
S3ListIterator(std::unique_ptr<ObjectListIterator> iter, size_t prefix_length)
: iter_(std::move(iter)), prefix_length_(prefix_length) {}
~S3ListIterator() override = default;
bool is_valid() override { return iter_->is_valid(); }
bool has_next() override { return iter_->has_next(); }
std::optional<FileMeta> next() override {
std::optional<FileMeta> ret;
if (auto obj = iter_->next(); obj.has_value()) {
ret = FileMeta {
.path = get_relative_path(obj->key),
.size = obj->size,
.mtime_s = obj->mtime_s,
};
}
return ret;
}
private:
std::string get_relative_path(const std::string& key) const {
return key.substr(prefix_length_);
}
std::unique_ptr<ObjectListIterator> iter_;
size_t prefix_length_;
};
std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB& obj_info,
bool skip_aksk) {
S3Conf s3_conf;
switch (obj_info.provider()) {
case ObjectStoreInfoPB_Provider_OSS:
case ObjectStoreInfoPB_Provider_S3:
case ObjectStoreInfoPB_Provider_COS:
case ObjectStoreInfoPB_Provider_OBS:
case ObjectStoreInfoPB_Provider_BOS:
s3_conf.provider = S3Conf::S3;
break;
case ObjectStoreInfoPB_Provider_GCP:
s3_conf.provider = S3Conf::GCS;
break;
case ObjectStoreInfoPB_Provider_AZURE:
s3_conf.provider = S3Conf::AZURE;
break;
default:
LOG_WARNING("unknown provider type {}").tag("obj_info", proto_to_json(obj_info));
return std::nullopt;
}
if (!skip_aksk) {
if (!obj_info.ak().empty() && !obj_info.sk().empty()) {
if (obj_info.has_encryption_info()) {
AkSkPair plain_ak_sk_pair;
int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(),
obj_info.encryption_info(), &plain_ak_sk_pair);
if (ret != 0) {
LOG_WARNING("fail to decrypt ak sk").tag("obj_info", proto_to_json(obj_info));
return std::nullopt;
} else {
s3_conf.ak = std::move(plain_ak_sk_pair.first);
s3_conf.sk = std::move(plain_ak_sk_pair.second);
}
} else {
s3_conf.ak = obj_info.ak();
s3_conf.sk = obj_info.sk();
}
}
if (obj_info.has_role_arn() && !obj_info.role_arn().empty()) {
s3_conf.role_arn = obj_info.role_arn();
s3_conf.external_id = obj_info.external_id();
s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
}
}
s3_conf.endpoint = obj_info.endpoint();
s3_conf.region = obj_info.region();
s3_conf.bucket = obj_info.bucket();
s3_conf.prefix = obj_info.prefix();
s3_conf.use_virtual_addressing = !obj_info.use_path_style();
return s3_conf;
}
S3Accessor::S3Accessor(S3Conf conf)
: StorageVaultAccessor(AccessorType::S3), conf_(std::move(conf)) {}
S3Accessor::~S3Accessor() = default;
std::string S3Accessor::get_key(const std::string& relative_path) const {
return conf_.prefix.empty() ? relative_path : conf_.prefix + '/' + relative_path;
}
std::string S3Accessor::to_uri(const std::string& relative_path) const {
return uri_ + '/' + relative_path;
}
int S3Accessor::create(S3Conf conf, std::shared_ptr<S3Accessor>* accessor) {
TEST_SYNC_POINT_RETURN_WITH_VALUE("S3Accessor::init.s3_init_failed", (int)-1);
switch (conf.provider) {
case S3Conf::GCS:
*accessor = std::make_shared<GcsAccessor>(conf);
break;
default:
*accessor = std::make_shared<S3Accessor>(conf);
break;
}
return (*accessor)->init();
}
static std::shared_ptr<SimpleThreadPool> worker_pool;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> S3Accessor::get_aws_credentials_provider(
const S3Conf& s3_conf) {
if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) {
Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk);
DCHECK(!aws_cred.IsExpiredOrEmpty());
return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred));
}
if (s3_conf.cred_provider_type == CredProviderType::InstanceProfile) {
if (s3_conf.role_arn.empty()) {
return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
}
auto stsClient = std::make_shared<Aws::STS::STSClient>(
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>());
return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
s3_conf.role_arn, Aws::String(), s3_conf.external_id,
Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient);
}
return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
}
int S3Accessor::init() {
static std::once_flag log_annotated_tags_key_once;
std::call_once(log_annotated_tags_key_once, [&]() {
LOG_INFO("start s3 accessor parallel worker pool");
worker_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism, "s3_accessor");
worker_pool->start();
});
switch (conf_.provider) {
case S3Conf::AZURE: {
#ifdef USE_AZURE
Azure::Storage::Blobs::BlobClientOptions options;
options.Retry.StatusCodes.insert(Azure::Core::Http::HttpStatusCode::TooManyRequests);
options.Retry.MaxRetries = config::max_s3_client_retry;
auto cred =
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(conf_.ak, conf_.sk);
if (config::force_azure_blob_global_endpoint) {
uri_ = fmt::format("https://{}.blob.core.windows.net/{}", conf_.ak, conf_.bucket);
} else {
uri_ = fmt::format("{}/{}", conf_.endpoint, conf_.bucket);
if (uri_.find("://") == std::string::npos) {
uri_ = "https://" + uri_;
}
}
// In Azure's HTTP requests, all policies in the vector are called in a chained manner following the HTTP pipeline approach.
// Within the RetryPolicy, the nextPolicy is called multiple times inside a loop.
// All policies in the PerRetryPolicies are downstream of the RetryPolicy.
// Therefore, you only need to add a policy to check if the response code is 429 and if the retry count meets the condition, it can record the retry count.
options.PerRetryPolicies.emplace_back(
std::make_unique<AzureRetryRecordPolicy>(config::max_s3_client_retry));
auto container_client = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(
uri_, cred, std::move(options));
// uri format for debug: ${scheme}://${ak}.blob.core.windows.net/${bucket}/${prefix}
uri_ = uri_ + '/' + conf_.prefix;
obj_client_ = std::make_shared<AzureObjClient>(std::move(container_client));
return 0;
#else
LOG_FATAL("BE is not compiled with azure support, export BUILD_AZURE=ON before building");
return 0;
#endif
}
default: {
if (conf_.prefix.empty()) {
uri_ = conf_.endpoint + '/' + conf_.bucket;
} else {
uri_ = conf_.endpoint + '/' + conf_.bucket + '/' + conf_.prefix;
}
static S3Environment s3_env;
// S3Conf::S3
Aws::Client::ClientConfiguration aws_config;
aws_config.endpointOverride = conf_.endpoint;
aws_config.region = conf_.region;
// Aws::Http::CurlHandleContainer::AcquireCurlHandle() may be blocked if the connecitons are bottleneck
aws_config.maxConnections = std::max((long)(config::recycle_pool_parallelism +
config::instance_recycler_worker_pool_size),
(long)aws_config.maxConnections);
if (config::s3_client_http_scheme == "http") {
aws_config.scheme = Aws::Http::Scheme::HTTP;
}
aws_config.retryStrategy = std::make_shared<S3CustomRetryStrategy>(
config::max_s3_client_retry /*scaleFactor = 25*/);
auto s3_client = std::make_shared<Aws::S3::S3Client>(
get_aws_credentials_provider(conf_), std::move(aws_config),
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
conf_.use_virtual_addressing /* useVirtualAddressing */);
obj_client_ = std::make_shared<S3ObjClient>(std::move(s3_client), conf_.endpoint);
return 0;
}
}
}
int S3Accessor::delete_prefix_impl(const std::string& path_prefix, int64_t expiration_time) {
LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix));
return obj_client_
->delete_objects_recursively({.bucket = conf_.bucket, .key = get_key(path_prefix)},
{.executor = worker_pool}, expiration_time)
.ret;
}
int S3Accessor::delete_prefix(const std::string& path_prefix, int64_t expiration_time) {
auto norm_path_prefix = path_prefix;
strip_leading(norm_path_prefix, "/");
if (norm_path_prefix.empty()) {
LOG_WARNING("invalid path_prefix {}", path_prefix);
return -1;
}
return delete_prefix_impl(norm_path_prefix, expiration_time);
}
int S3Accessor::delete_directory(const std::string& dir_path) {
auto norm_dir_path = dir_path;
strip_leading(norm_dir_path, "/");
if (norm_dir_path.empty()) {
LOG_WARNING("invalid dir_path {}", dir_path);
return -1;
}
return delete_prefix_impl(!norm_dir_path.ends_with('/') ? norm_dir_path + '/' : norm_dir_path);
}
int S3Accessor::delete_all(int64_t expiration_time) {
return delete_prefix_impl("", expiration_time);
}
int S3Accessor::delete_files(const std::vector<std::string>& paths) {
if (paths.empty()) {
return 0;
}
std::vector<std::string> keys;
keys.reserve(paths.size());
for (auto&& path : paths) {
LOG_INFO("delete file").tag("uri", to_uri(path));
keys.emplace_back(get_key(path));
}
return obj_client_->delete_objects(conf_.bucket, std::move(keys), {.executor = worker_pool})
.ret;
}
int S3Accessor::delete_file(const std::string& path) {
LOG_INFO("delete file").tag("uri", to_uri(path));
int ret = obj_client_->delete_object({.bucket = conf_.bucket, .key = get_key(path)}).ret;
static_assert(ObjectStorageResponse::OK == 0);
if (ret == ObjectStorageResponse::OK || ret == ObjectStorageResponse::NOT_FOUND) {
return 0;
}
return ret;
}
int S3Accessor::put_file(const std::string& path, const std::string& content) {
return obj_client_->put_object({.bucket = conf_.bucket, .key = get_key(path)}, content).ret;
}
int S3Accessor::list_prefix(const std::string& path_prefix, std::unique_ptr<ListIterator>* res) {
*res = std::make_unique<S3ListIterator>(
obj_client_->list_objects({conf_.bucket, get_key(path_prefix)}),
conf_.prefix.length() + 1 /* {prefix}/ */);
return 0;
}
int S3Accessor::list_directory(const std::string& dir_path, std::unique_ptr<ListIterator>* res) {
auto norm_dir_path = dir_path;
strip_leading(norm_dir_path, "/");
if (norm_dir_path.empty()) {
LOG_WARNING("invalid dir_path {}", dir_path);
return -1;
}
return list_prefix(!norm_dir_path.ends_with('/') ? norm_dir_path + '/' : norm_dir_path, res);
}
int S3Accessor::list_all(std::unique_ptr<ListIterator>* res) {
return list_prefix("", res);
}
int S3Accessor::exists(const std::string& path) {
ObjectMeta obj_meta;
return obj_client_->head_object({.bucket = conf_.bucket, .key = get_key(path)}, &obj_meta).ret;
}
int S3Accessor::get_life_cycle(int64_t* expiration_days) {
return obj_client_->get_life_cycle(conf_.bucket, expiration_days).ret;
}
int S3Accessor::check_versioning() {
return obj_client_->check_versioning(conf_.bucket).ret;
}
int GcsAccessor::delete_prefix_impl(const std::string& path_prefix, int64_t expiration_time) {
LOG_INFO("begin delete prefix").tag("uri", to_uri(path_prefix));
int ret = 0;
int cnt = 0;
int skip = 0;
int64_t del_nonexisted = 0;
int del = 0;
auto iter = obj_client_->list_objects({conf_.bucket, get_key(path_prefix)});
for (auto obj = iter->next(); obj.has_value(); obj = iter->next()) {
if (!(++cnt % 100)) {
LOG_INFO("loop delete prefix")
.tag("uri", to_uri(path_prefix))
.tag("total_obj_cnt", cnt)
.tag("deleted", del)
.tag("del_nonexisted", del_nonexisted)
.tag("skipped", skip);
}
if (expiration_time > 0 && obj->mtime_s > expiration_time) {
skip++;
continue;
}
del++;
// FIXME(plat1ko): Delete objects by batch with genuine GCS client
int del_ret = obj_client_->delete_object({conf_.bucket, obj->key}).ret;
del_nonexisted += (del_ret == ObjectStorageResponse::NOT_FOUND);
static_assert(ObjectStorageResponse::OK == 0);
if (del_ret != ObjectStorageResponse::OK && del_ret != ObjectStorageResponse::NOT_FOUND) {
ret = del_ret;
}
}
LOG_INFO("finish delete prefix")
.tag("uri", to_uri(path_prefix))
.tag("total_obj_cnt", cnt)
.tag("deleted", del)
.tag("del_nonexisted", del_nonexisted)
.tag("skipped", skip);
if (!iter->is_valid()) {
return -1;
}
return ret;
}
int GcsAccessor::delete_files(const std::vector<std::string>& paths) {
std::vector<int> delete_rets(paths.size());
#ifdef USE_LIBCPP
std::transform(paths.begin(), paths.end(), delete_rets.begin(),
#else
std::transform(std::execution::par, paths.begin(), paths.end(), delete_rets.begin(),
#endif
[this](const std::string& path) {
LOG_INFO("delete file").tag("uri", to_uri(path));
return delete_file(path);
});
int ret = 0;
for (int delete_ret : delete_rets) {
if (delete_ret != 0) {
ret = delete_ret;
break;
}
}
return ret;
}
} // namespace doris::cloud