cloud/src/recycler/s3_accessor.cpp (423 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "recycler/s3_accessor.h" #include <aws/core/auth/AWSAuthSigner.h> #include <aws/core/auth/AWSCredentials.h> #include <aws/core/auth/AWSCredentialsProviderChain.h> #include <aws/core/client/DefaultRetryStrategy.h> #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> #include <aws/s3/S3Client.h> #include <aws/sts/STSClient.h> #include <bvar/reducer.h> #include <gen_cpp/cloud.pb.h> #include <algorithm> #ifdef USE_AZURE #include <azure/storage/blobs/blob_container_client.hpp> #include <azure/storage/common/storage_credential.hpp> #endif #include <execution> #include <memory> #include <utility> #include "common/config.h" #include "common/encryption_util.h" #include "common/logging.h" #include "common/simple_thread_pool.h" #include "common/string_util.h" #include "common/util.h" #include "cpp/aws_logger.h" #include "cpp/obj_retry_strategy.h" #include "cpp/s3_rate_limiter.h" #include "cpp/sync_point.h" #ifdef USE_AZURE #include "recycler/azure_obj_client.h" #endif #include "recycler/obj_storage_client.h" #include "recycler/s3_obj_client.h" #include "recycler/storage_vault_accessor.h" namespace doris::cloud { namespace s3_bvar { bvar::LatencyRecorder s3_get_latency("s3_get"); bvar::LatencyRecorder s3_put_latency("s3_put"); bvar::LatencyRecorder s3_delete_object_latency("s3_delete_object"); bvar::LatencyRecorder s3_delete_objects_latency("s3_delete_objects"); bvar::LatencyRecorder s3_head_latency("s3_head"); bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload"); bvar::LatencyRecorder s3_list_latency("s3_list"); bvar::LatencyRecorder s3_list_object_versions_latency("s3_list_object_versions"); bvar::LatencyRecorder s3_get_bucket_version_latency("s3_get_bucket_version"); bvar::LatencyRecorder s3_copy_object_latency("s3_copy_object"); }; // namespace s3_bvar bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns"); bvar::Adder<int64_t> get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_num"); bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns"); bvar::Adder<int64_t> put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num"); AccessorRateLimiter::AccessorRateLimiter() : _rate_limiters( {std::make_unique<S3RateLimiterHolder>( config::s3_get_token_per_second, config::s3_get_bucket_tokens, config::s3_get_token_limit, metric_func_factory(get_rate_limit_ns, get_rate_limit_exceed_req_num)), std::make_unique<S3RateLimiterHolder>( config::s3_put_token_per_second, config::s3_put_bucket_tokens, config::s3_put_token_limit, metric_func_factory(put_rate_limit_ns, put_rate_limit_exceed_req_num))}) {} S3RateLimiterHolder* AccessorRateLimiter::rate_limiter(S3RateLimitType type) { CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << to_string(type); return _rate_limiters[static_cast<size_t>(type)].get(); } AccessorRateLimiter& AccessorRateLimiter::instance() { static AccessorRateLimiter instance; return instance; } int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit) { if (type == S3RateLimitType::UNKNOWN) { return -1; } if (type == S3RateLimitType::GET) { max_speed = (max_speed == 0) ? config::s3_get_token_per_second : max_speed; max_burst = (max_burst == 0) ? config::s3_get_bucket_tokens : max_burst; limit = (limit == 0) ? config::s3_get_token_limit : limit; } else { max_speed = (max_speed == 0) ? config::s3_put_token_per_second : max_speed; max_burst = (max_burst == 0) ? config::s3_put_bucket_tokens : max_burst; limit = (limit == 0) ? config::s3_put_token_limit : limit; } return AccessorRateLimiter::instance().rate_limiter(type)->reset(max_speed, max_burst, limit); } class S3Environment { public: S3Environment() { aws_options_ = Aws::SDKOptions {}; auto logLevel = static_cast<Aws::Utils::Logging::LogLevel>(config::aws_log_level); aws_options_.loggingOptions.logLevel = logLevel; aws_options_.loggingOptions.logger_create_fn = [logLevel] { return std::make_shared<DorisAWSLogger>(logLevel); }; Aws::InitAPI(aws_options_); } ~S3Environment() { Aws::ShutdownAPI(aws_options_); } private: Aws::SDKOptions aws_options_; }; class S3ListIterator final : public ListIterator { public: S3ListIterator(std::unique_ptr<ObjectListIterator> iter, size_t prefix_length) : iter_(std::move(iter)), prefix_length_(prefix_length) {} ~S3ListIterator() override = default; bool is_valid() override { return iter_->is_valid(); } bool has_next() override { return iter_->has_next(); } std::optional<FileMeta> next() override { std::optional<FileMeta> ret; if (auto obj = iter_->next(); obj.has_value()) { ret = FileMeta { .path = get_relative_path(obj->key), .size = obj->size, .mtime_s = obj->mtime_s, }; } return ret; } private: std::string get_relative_path(const std::string& key) const { return key.substr(prefix_length_); } std::unique_ptr<ObjectListIterator> iter_; size_t prefix_length_; }; std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB& obj_info, bool skip_aksk) { S3Conf s3_conf; switch (obj_info.provider()) { case ObjectStoreInfoPB_Provider_OSS: case ObjectStoreInfoPB_Provider_S3: case ObjectStoreInfoPB_Provider_COS: case ObjectStoreInfoPB_Provider_OBS: case ObjectStoreInfoPB_Provider_BOS: s3_conf.provider = S3Conf::S3; break; case ObjectStoreInfoPB_Provider_GCP: s3_conf.provider = S3Conf::GCS; break; case ObjectStoreInfoPB_Provider_AZURE: s3_conf.provider = S3Conf::AZURE; break; default: LOG_WARNING("unknown provider type {}").tag("obj_info", proto_to_json(obj_info)); return std::nullopt; } if (!skip_aksk) { if (!obj_info.ak().empty() && !obj_info.sk().empty()) { if (obj_info.has_encryption_info()) { AkSkPair plain_ak_sk_pair; int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), &plain_ak_sk_pair); if (ret != 0) { LOG_WARNING("fail to decrypt ak sk").tag("obj_info", proto_to_json(obj_info)); return std::nullopt; } else { s3_conf.ak = std::move(plain_ak_sk_pair.first); s3_conf.sk = std::move(plain_ak_sk_pair.second); } } else { s3_conf.ak = obj_info.ak(); s3_conf.sk = obj_info.sk(); } } if (obj_info.has_role_arn() && !obj_info.role_arn().empty()) { s3_conf.role_arn = obj_info.role_arn(); s3_conf.external_id = obj_info.external_id(); s3_conf.cred_provider_type = CredProviderType::InstanceProfile; } } s3_conf.endpoint = obj_info.endpoint(); s3_conf.region = obj_info.region(); s3_conf.bucket = obj_info.bucket(); s3_conf.prefix = obj_info.prefix(); s3_conf.use_virtual_addressing = !obj_info.use_path_style(); return s3_conf; } S3Accessor::S3Accessor(S3Conf conf) : StorageVaultAccessor(AccessorType::S3), conf_(std::move(conf)) {} S3Accessor::~S3Accessor() = default; std::string S3Accessor::get_key(const std::string& relative_path) const { return conf_.prefix.empty() ? relative_path : conf_.prefix + '/' + relative_path; } std::string S3Accessor::to_uri(const std::string& relative_path) const { return uri_ + '/' + relative_path; } int S3Accessor::create(S3Conf conf, std::shared_ptr<S3Accessor>* accessor) { TEST_SYNC_POINT_RETURN_WITH_VALUE("S3Accessor::init.s3_init_failed", (int)-1); switch (conf.provider) { case S3Conf::GCS: *accessor = std::make_shared<GcsAccessor>(conf); break; default: *accessor = std::make_shared<S3Accessor>(conf); break; } return (*accessor)->init(); } static std::shared_ptr<SimpleThreadPool> worker_pool; std::shared_ptr<Aws::Auth::AWSCredentialsProvider> S3Accessor::get_aws_credentials_provider( const S3Conf& s3_conf) { if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) { Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk); DCHECK(!aws_cred.IsExpiredOrEmpty()); return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred)); } if (s3_conf.cred_provider_type == CredProviderType::InstanceProfile) { if (s3_conf.role_arn.empty()) { return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); } auto stsClient = std::make_shared<Aws::STS::STSClient>( std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>()); return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( s3_conf.role_arn, Aws::String(), s3_conf.external_id, Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient); } return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); } int S3Accessor::init() { static std::once_flag log_annotated_tags_key_once; std::call_once(log_annotated_tags_key_once, [&]() { LOG_INFO("start s3 accessor parallel worker pool"); worker_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism, "s3_accessor"); worker_pool->start(); }); switch (conf_.provider) { case S3Conf::AZURE: { #ifdef USE_AZURE Azure::Storage::Blobs::BlobClientOptions options; options.Retry.StatusCodes.insert(Azure::Core::Http::HttpStatusCode::TooManyRequests); options.Retry.MaxRetries = config::max_s3_client_retry; auto cred = std::make_shared<Azure::Storage::StorageSharedKeyCredential>(conf_.ak, conf_.sk); if (config::force_azure_blob_global_endpoint) { uri_ = fmt::format("https://{}.blob.core.windows.net/{}", conf_.ak, conf_.bucket); } else { uri_ = fmt::format("{}/{}", conf_.endpoint, conf_.bucket); if (uri_.find("://") == std::string::npos) { uri_ = "https://" + uri_; } } // In Azure's HTTP requests, all policies in the vector are called in a chained manner following the HTTP pipeline approach. // Within the RetryPolicy, the nextPolicy is called multiple times inside a loop. // All policies in the PerRetryPolicies are downstream of the RetryPolicy. // Therefore, you only need to add a policy to check if the response code is 429 and if the retry count meets the condition, it can record the retry count. options.PerRetryPolicies.emplace_back( std::make_unique<AzureRetryRecordPolicy>(config::max_s3_client_retry)); auto container_client = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>( uri_, cred, std::move(options)); // uri format for debug: ${scheme}://${ak}.blob.core.windows.net/${bucket}/${prefix} uri_ = uri_ + '/' + conf_.prefix; obj_client_ = std::make_shared<AzureObjClient>(std::move(container_client)); return 0; #else LOG_FATAL("BE is not compiled with azure support, export BUILD_AZURE=ON before building"); return 0; #endif } default: { if (conf_.prefix.empty()) { uri_ = conf_.endpoint + '/' + conf_.bucket; } else { uri_ = conf_.endpoint + '/' + conf_.bucket + '/' + conf_.prefix; } static S3Environment s3_env; // S3Conf::S3 Aws::Client::ClientConfiguration aws_config; aws_config.endpointOverride = conf_.endpoint; aws_config.region = conf_.region; // Aws::Http::CurlHandleContainer::AcquireCurlHandle() may be blocked if the connecitons are bottleneck aws_config.maxConnections = std::max((long)(config::recycle_pool_parallelism + config::instance_recycler_worker_pool_size), (long)aws_config.maxConnections); if (config::s3_client_http_scheme == "http") { aws_config.scheme = Aws::Http::Scheme::HTTP; } aws_config.retryStrategy = std::make_shared<S3CustomRetryStrategy>( config::max_s3_client_retry /*scaleFactor = 25*/); auto s3_client = std::make_shared<Aws::S3::S3Client>( get_aws_credentials_provider(conf_), std::move(aws_config), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, conf_.use_virtual_addressing /* useVirtualAddressing */); obj_client_ = std::make_shared<S3ObjClient>(std::move(s3_client), conf_.endpoint); return 0; } } } int S3Accessor::delete_prefix_impl(const std::string& path_prefix, int64_t expiration_time) { LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix)); return obj_client_ ->delete_objects_recursively({.bucket = conf_.bucket, .key = get_key(path_prefix)}, {.executor = worker_pool}, expiration_time) .ret; } int S3Accessor::delete_prefix(const std::string& path_prefix, int64_t expiration_time) { auto norm_path_prefix = path_prefix; strip_leading(norm_path_prefix, "/"); if (norm_path_prefix.empty()) { LOG_WARNING("invalid path_prefix {}", path_prefix); return -1; } return delete_prefix_impl(norm_path_prefix, expiration_time); } int S3Accessor::delete_directory(const std::string& dir_path) { auto norm_dir_path = dir_path; strip_leading(norm_dir_path, "/"); if (norm_dir_path.empty()) { LOG_WARNING("invalid dir_path {}", dir_path); return -1; } return delete_prefix_impl(!norm_dir_path.ends_with('/') ? norm_dir_path + '/' : norm_dir_path); } int S3Accessor::delete_all(int64_t expiration_time) { return delete_prefix_impl("", expiration_time); } int S3Accessor::delete_files(const std::vector<std::string>& paths) { if (paths.empty()) { return 0; } std::vector<std::string> keys; keys.reserve(paths.size()); for (auto&& path : paths) { LOG_INFO("delete file").tag("uri", to_uri(path)); keys.emplace_back(get_key(path)); } return obj_client_->delete_objects(conf_.bucket, std::move(keys), {.executor = worker_pool}) .ret; } int S3Accessor::delete_file(const std::string& path) { LOG_INFO("delete file").tag("uri", to_uri(path)); int ret = obj_client_->delete_object({.bucket = conf_.bucket, .key = get_key(path)}).ret; static_assert(ObjectStorageResponse::OK == 0); if (ret == ObjectStorageResponse::OK || ret == ObjectStorageResponse::NOT_FOUND) { return 0; } return ret; } int S3Accessor::put_file(const std::string& path, const std::string& content) { return obj_client_->put_object({.bucket = conf_.bucket, .key = get_key(path)}, content).ret; } int S3Accessor::list_prefix(const std::string& path_prefix, std::unique_ptr<ListIterator>* res) { *res = std::make_unique<S3ListIterator>( obj_client_->list_objects({conf_.bucket, get_key(path_prefix)}), conf_.prefix.length() + 1 /* {prefix}/ */); return 0; } int S3Accessor::list_directory(const std::string& dir_path, std::unique_ptr<ListIterator>* res) { auto norm_dir_path = dir_path; strip_leading(norm_dir_path, "/"); if (norm_dir_path.empty()) { LOG_WARNING("invalid dir_path {}", dir_path); return -1; } return list_prefix(!norm_dir_path.ends_with('/') ? norm_dir_path + '/' : norm_dir_path, res); } int S3Accessor::list_all(std::unique_ptr<ListIterator>* res) { return list_prefix("", res); } int S3Accessor::exists(const std::string& path) { ObjectMeta obj_meta; return obj_client_->head_object({.bucket = conf_.bucket, .key = get_key(path)}, &obj_meta).ret; } int S3Accessor::get_life_cycle(int64_t* expiration_days) { return obj_client_->get_life_cycle(conf_.bucket, expiration_days).ret; } int S3Accessor::check_versioning() { return obj_client_->check_versioning(conf_.bucket).ret; } int GcsAccessor::delete_prefix_impl(const std::string& path_prefix, int64_t expiration_time) { LOG_INFO("begin delete prefix").tag("uri", to_uri(path_prefix)); int ret = 0; int cnt = 0; int skip = 0; int64_t del_nonexisted = 0; int del = 0; auto iter = obj_client_->list_objects({conf_.bucket, get_key(path_prefix)}); for (auto obj = iter->next(); obj.has_value(); obj = iter->next()) { if (!(++cnt % 100)) { LOG_INFO("loop delete prefix") .tag("uri", to_uri(path_prefix)) .tag("total_obj_cnt", cnt) .tag("deleted", del) .tag("del_nonexisted", del_nonexisted) .tag("skipped", skip); } if (expiration_time > 0 && obj->mtime_s > expiration_time) { skip++; continue; } del++; // FIXME(plat1ko): Delete objects by batch with genuine GCS client int del_ret = obj_client_->delete_object({conf_.bucket, obj->key}).ret; del_nonexisted += (del_ret == ObjectStorageResponse::NOT_FOUND); static_assert(ObjectStorageResponse::OK == 0); if (del_ret != ObjectStorageResponse::OK && del_ret != ObjectStorageResponse::NOT_FOUND) { ret = del_ret; } } LOG_INFO("finish delete prefix") .tag("uri", to_uri(path_prefix)) .tag("total_obj_cnt", cnt) .tag("deleted", del) .tag("del_nonexisted", del_nonexisted) .tag("skipped", skip); if (!iter->is_valid()) { return -1; } return ret; } int GcsAccessor::delete_files(const std::vector<std::string>& paths) { std::vector<int> delete_rets(paths.size()); #ifdef USE_LIBCPP std::transform(paths.begin(), paths.end(), delete_rets.begin(), #else std::transform(std::execution::par, paths.begin(), paths.end(), delete_rets.begin(), #endif [this](const std::string& path) { LOG_INFO("delete file").tag("uri", to_uri(path)); return delete_file(path); }); int ret = 0; for (int delete_ret : delete_rets) { if (delete_ret != 0) { ret = delete_ret; break; } } return ret; } } // namespace doris::cloud