std::shared_ptr getClient()

in cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp [497:622]


    std::shared_ptr<DB::S3::Client> getClient(const std::string & bucket_name)
    {
        const auto & config = context->getConfigRef();
        const auto & settings = context->getSettingsRef();
        bool use_assumed_role = false;
        bool is_per_bucket = false;

        if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE).empty())
            use_assumed_role = true;

        if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE, "", true).empty())
            is_per_bucket = true;

        if (is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
        {
            auto client = per_bucket_clients.get(bucket_name);
            if (client.has_value())
            {
                return client.get();
            }
        }

        if (!is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
        {
            auto client = per_bucket_clients.get(SHARED_CLIENT_KEY);
            if (client.has_value())
                return client.get();
        }

        String config_prefix = "s3";
        auto endpoint = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ENDPOINT, "https://s3.us-west-2.amazonaws.com");
        bool end_point_start_with_http_or_https = endpoint.starts_with("https://") || endpoint.starts_with("http://");
        if (!end_point_start_with_http_or_https)
        {
            if (endpoint.starts_with("s3"))
                // as https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/bk_cloud-data-access/content/s3-config-parameters.html
                // the fs.s3a.endpoint does not contain https:// prefix
                endpoint = "https://" + endpoint;
            else
                throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Endpoint format not right: {}", endpoint);
        }
        String region_name;
        const char * amazon_suffix = ".amazonaws.com";
        const char * amazon_prefix = "https://s3.";
        auto pos = endpoint.find(amazon_suffix);
        if (pos != std::string::npos)
        {
            assert(endpoint.starts_with(amazon_prefix));
            region_name = endpoint.substr(strlen(amazon_prefix), pos - strlen(amazon_prefix));
            assert(region_name.find('.') == std::string::npos);
        }
        // for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work

        DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
            region_name,
            context->getRemoteHostFilter(),
            static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_max_redirects]),
            static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_retry_attempts]),
            false,
            false,
            nullptr,
            nullptr);

        client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
        client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
        client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
        client_configuration.endpointOverride = endpoint;

        client_configuration.retryStrategy
            = std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));

        std::string ak;
        std::string sk;
        std::string path_style_access;
        bool addressing_type = false;
        tryGetString(settings, BackendInitializerUtil::HADOOP_S3_ACCESS_KEY, ak);
        tryGetString(settings, BackendInitializerUtil::HADOOP_S3_SECRET_KEY, sk);
        tryGetString(settings, BackendInitializerUtil::HADOOP_S3_PATH_STYLE_ACCESS, path_style_access);
        const DB::Settings & global_settings = context->getGlobalContext()->getSettingsRef();
        const DB::Settings & local_settings = context->getSettingsRef();
        boost::algorithm::to_lower(path_style_access);
        addressing_type = (path_style_access == "false");
        DB::S3::ClientSettings client_settings{
            .use_virtual_addressing = addressing_type,
            .disable_checksum = local_settings[DB::Setting::s3_disable_checksum],
            .gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
        };
        if (use_assumed_role)
        {
            auto new_client = DB::S3::ClientFactory::instance().create(
                client_configuration,
                client_settings,
                ak, // access_key_id
                sk, // secret_access_key
                "", // server_side_encryption_customer_key_base64
                {}, // sse_kms_config
                {}, // headers
                DB::S3::CredentialsConfiguration{
                    .use_environment_credentials = true,
                    .use_insecure_imds_request = false,
                    .role_arn = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE),
                    .session_name = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
                    .external_id = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)});

            //TODO: support online change config for cached per_bucket_clients
            std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
            cacheClient(bucket_name, is_per_bucket, ret);
            return ret;
        }
        else
        {
            auto new_client = DB::S3::ClientFactory::instance().create(
                client_configuration,
                client_settings,
                ak, // access_key_id
                sk, // secret_access_key
                "", // server_side_encryption_customer_key_base64
                {}, // sse_kms_config
                {}, // headers
                DB::S3::CredentialsConfiguration{.use_environment_credentials = true, .use_insecure_imds_request = false});

            std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
            cacheClient(bucket_name, is_per_bucket, ret);
            return ret;
        }
    }