in cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp [497:622]
std::shared_ptr<DB::S3::Client> getClient(const std::string & bucket_name)
{
const auto & config = context->getConfigRef();
const auto & settings = context->getSettingsRef();
bool use_assumed_role = false;
bool is_per_bucket = false;
if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE).empty())
use_assumed_role = true;
if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE, "", true).empty())
is_per_bucket = true;
if (is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
{
auto client = per_bucket_clients.get(bucket_name);
if (client.has_value())
{
return client.get();
}
}
if (!is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
{
auto client = per_bucket_clients.get(SHARED_CLIENT_KEY);
if (client.has_value())
return client.get();
}
String config_prefix = "s3";
auto endpoint = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ENDPOINT, "https://s3.us-west-2.amazonaws.com");
bool end_point_start_with_http_or_https = endpoint.starts_with("https://") || endpoint.starts_with("http://");
if (!end_point_start_with_http_or_https)
{
if (endpoint.starts_with("s3"))
// as https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/bk_cloud-data-access/content/s3-config-parameters.html
// the fs.s3a.endpoint does not contain https:// prefix
endpoint = "https://" + endpoint;
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Endpoint format not right: {}", endpoint);
}
String region_name;
const char * amazon_suffix = ".amazonaws.com";
const char * amazon_prefix = "https://s3.";
auto pos = endpoint.find(amazon_suffix);
if (pos != std::string::npos)
{
assert(endpoint.starts_with(amazon_prefix));
region_name = endpoint.substr(strlen(amazon_prefix), pos - strlen(amazon_prefix));
assert(region_name.find('.') == std::string::npos);
}
// for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work
DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
region_name,
context->getRemoteHostFilter(),
static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_max_redirects]),
static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_retry_attempts]),
false,
false,
nullptr,
nullptr);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = endpoint;
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));
std::string ak;
std::string sk;
std::string path_style_access;
bool addressing_type = false;
tryGetString(settings, BackendInitializerUtil::HADOOP_S3_ACCESS_KEY, ak);
tryGetString(settings, BackendInitializerUtil::HADOOP_S3_SECRET_KEY, sk);
tryGetString(settings, BackendInitializerUtil::HADOOP_S3_PATH_STYLE_ACCESS, path_style_access);
const DB::Settings & global_settings = context->getGlobalContext()->getSettingsRef();
const DB::Settings & local_settings = context->getSettingsRef();
boost::algorithm::to_lower(path_style_access);
addressing_type = (path_style_access == "false");
DB::S3::ClientSettings client_settings{
.use_virtual_addressing = addressing_type,
.disable_checksum = local_settings[DB::Setting::s3_disable_checksum],
.gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
};
if (use_assumed_role)
{
auto new_client = DB::S3::ClientFactory::instance().create(
client_configuration,
client_settings,
ak, // access_key_id
sk, // secret_access_key
"", // server_side_encryption_customer_key_base64
{}, // sse_kms_config
{}, // headers
DB::S3::CredentialsConfiguration{
.use_environment_credentials = true,
.use_insecure_imds_request = false,
.role_arn = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE),
.session_name = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
.external_id = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)});
//TODO: support online change config for cached per_bucket_clients
std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
cacheClient(bucket_name, is_per_bucket, ret);
return ret;
}
else
{
auto new_client = DB::S3::ClientFactory::instance().create(
client_configuration,
client_settings,
ak, // access_key_id
sk, // secret_access_key
"", // server_side_encryption_customer_key_base64
{}, // sse_kms_config
{}, // headers
DB::S3::CredentialsConfiguration{.use_environment_credentials = true, .use_insecure_imds_request = false});
std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
cacheClient(bucket_name, is_per_bucket, ret);
return ret;
}
}