/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <memory>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <IO/BoundedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3Common.h>
#include <IO/SeekableReadBuffer.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <aws/core/client/DefaultRetryStrategy.h>

#include <sys/stat.h>
#include <Poco/URI.h>
#include "IO/ReadSettings.h"

#include <hdfs/hdfs.h>
#include <Poco/Logger.h>
#include <Common/FileCacheConcurrentMap.h>
#include <Common/Throttler.h>
#include <Common/logger_useful.h>
#include <Common/safe_cast.h>

#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCacheSettings.h>

#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <Common/CHUtil.h>

#include <shared_mutex>
#include <thread>
#include <boost/compute/detail/lru_cache.hpp>

namespace DB
{
namespace ErrorCodes
{
    extern const int BAD_ARGUMENTS;
    extern const int CANNOT_OPEN_FILE;
    extern const int UNKNOWN_FILE_SIZE;
    extern const int CANNOT_SEEK_THROUGH_FILE;
    extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
}
}

namespace local_engine
{
template <class key_type, class value_type>
class ConcurrentLRU
{
public:
    ConcurrentLRU(size_t size) : cache(size) { }
    boost::optional<value_type> get(const key_type & key)
    {
        std::shared_lock<std::shared_mutex> lock(rwLock);
        return cache.get(key);
    }


    void insert(const key_type & key, const value_type & value)
    {
        std::unique_lock<std::shared_mutex> lock(rwLock);
        cache.insert(key, value);
    }

    void clear()
    {
        std::unique_lock<std::shared_mutex> lock(rwLock);
        cache.clear();
    }

private:
    boost::compute::detail::lru_cache<key_type, value_type> cache;
    std::shared_mutex rwLock;
};

std::pair<size_t, size_t> adjustFileReadPosition(DB::ReadBufferFromFileBase & buffer, size_t read_start_pos, size_t read_end_pos)
{
    auto get_next_line_pos = [&](DB::ReadBufferFromFileBase & buf) -> size_t
    {
        while (!buf.eof())
        {
            if (*buf.position() == '\r')
            {
                ++buf.position();

                if (!buf.eof() && *buf.position() == '\n')
                {
                    ++buf.position();
                }

                return buf.getPosition();
            }
            else if (*buf.position() == '\n')
            {
                ++buf.position();
                return buf.getPosition();
            }

            ++buf.position();
        }

        return buf.getPosition();
    };

    std::pair<size_t, size_t> result;

    if (read_start_pos == 0)
        result.first = read_start_pos;
    else
    {
        buffer.seek(read_start_pos, SEEK_SET);
        result.first = get_next_line_pos(buffer);
    }

    if (read_end_pos == 0)
        result.second = read_end_pos;
    else
    {
        buffer.seek(read_end_pos, SEEK_SET);
        result.second = get_next_line_pos(buffer);
    }

    return result;
}

class LocalFileReadBufferBuilder : public ReadBufferBuilder
{
public:
    explicit LocalFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) { }
    ~LocalFileReadBufferBuilder() override = default;

    std::unique_ptr<DB::ReadBuffer>
    build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
    {
        Poco::URI file_uri(file_info.uri_file());
        std::unique_ptr<DB::ReadBufferFromFileBase> read_buffer;
        const String & file_path = file_uri.getPath();
        struct stat file_stat;
        if (stat(file_path.c_str(), &file_stat))
            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "file stat failed for {}", file_path);

        if (S_ISREG(file_stat.st_mode))
            read_buffer = std::make_unique<DB::ReadBufferFromFilePRead>(file_path);
        else
            read_buffer = std::make_unique<DB::ReadBufferFromFile>(file_path);


        if (set_read_util_position)
        {
            auto * work_around = read_buffer.get();
            read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
            // workaround for https://github.com/ClickHouse/ClickHouse/pull/58886
            // ReadBufferFromFileDecorator construtor will call swap, without wrap, BoundedReadBuffer can't work.
            read_buffer->swap(*work_around);

            auto start_end_pos = adjustFileReadPosition(*read_buffer, file_info.start(), file_info.start() + file_info.length());
            LOG_DEBUG(
                &Poco::Logger::get("ReadBufferBuilder"),
                "File read start and end position adjusted from {},{} to {},{}",
                file_info.start(),
                file_info.start() + file_info.length(),
                start_end_pos.first,
                start_end_pos.second);

            read_buffer->seek(start_end_pos.first, SEEK_SET);
            read_buffer->setReadUntilPosition(start_end_pos.second);
        }

        return read_buffer;
    }
};

#if USE_HDFS
class HDFSFileReadBufferBuilder : public ReadBufferBuilder
{
public:
    explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) { }
    ~HDFSFileReadBufferBuilder() override = default;

    std::unique_ptr<DB::ReadBuffer>
    build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
    {
        Poco::URI file_uri(file_info.uri_file());
        std::string uri_path = "hdfs://" + file_uri.getHost();
        if (file_uri.getPort())
            uri_path += ":" + std::to_string(file_uri.getPort());

        DB::ReadSettings read_settings;
        std::unique_ptr<DB::ReadBuffer> read_buffer;
        if (set_read_util_position)
        {
            std::pair<size_t, size_t> start_end_pos
                = adjustFileReadStartAndEndPos(file_info.start(), file_info.start() + file_info.length(), uri_path, file_uri.getPath());
            LOG_DEBUG(
                &Poco::Logger::get("ReadBufferBuilder"),
                "File read start and end position adjusted from {},{} to {},{}",
                file_info.start(),
                file_info.start() + file_info.length(),
                start_end_pos.first,
                start_end_pos.second);
            read_buffer = std::make_unique<DB::ReadBufferFromHDFS>(
                uri_path, file_uri.getPath(), context->getConfigRef(), read_settings, start_end_pos.second);

            if (auto * seekable_in = dynamic_cast<DB::SeekableReadBuffer *>(read_buffer.get()))
                if (start_end_pos.first)
                    seekable_in->seek(start_end_pos.first, SEEK_SET);
        }
        else
        {
            read_buffer = std::make_unique<DB::ReadBufferFromHDFS>(uri_path, file_uri.getPath(), context->getConfigRef(), read_settings);
        }
        return read_buffer;
    }

    std::pair<size_t, size_t>
    adjustFileReadStartAndEndPos(size_t read_start_pos, size_t read_end_pos, const std::string & uri_path, const std::string & file_path)
    {
        auto builder = DB::createHDFSBuilder(uri_path, context->getConfigRef());
        auto fs = DB::createHDFSFS(builder.get());
        hdfsFile fin = hdfsOpenFile(fs.get(), file_path.c_str(), O_RDONLY, 0, 0, 0);
        std::string hdfs_file_path = uri_path + file_path;
        if (!fin)
            throw DB::Exception(
                DB::ErrorCodes::CANNOT_OPEN_FILE, "Cannot open hdfs file:{}, error: {}", hdfs_file_path, std::string(hdfsGetLastError()));

        /// Always close hdfs file before exit function.
        SCOPE_EXIT({ hdfsCloseFile(fs.get(), fin); });

        auto hdfs_file_info = hdfsGetPathInfo(fs.get(), file_path.c_str());
        if (!hdfs_file_info)
            throw DB::Exception(
                DB::ErrorCodes::UNKNOWN_FILE_SIZE,
                "Cannot find out file size for :{}, error: {}",
                hdfs_file_path,
                std::string(hdfsGetLastError()));
        size_t hdfs_file_size = hdfs_file_info->mSize;

        /// initial_pos maybe in the middle of a row, so we need to find the next row start position.
        auto get_next_line_pos = [&](hdfsFS hdfs_fs, hdfsFile file, size_t initial_pos, size_t file_size) -> size_t
        {
            if (initial_pos == 0 || initial_pos == file_size)
                return initial_pos;

            int seek_ret = hdfsSeek(hdfs_fs, file, initial_pos);
            if (seek_ret < 0)
                throw DB::Exception(
                    DB::ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
                    "Fail to seek HDFS file: {}, error: {}",
                    file_path,
                    std::string(hdfsGetLastError()));

            static constexpr size_t buf_size = 1024;
            char buf[buf_size];

            auto do_read = [&]() -> int
            {
                auto n = hdfsRead(hdfs_fs, file, buf, buf_size);
                if (n < 0)
                    throw DB::Exception(
                        DB::ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR,
                        "Fail to read HDFS file: {}, error: {}",
                        file_path,
                        std::string(hdfsGetLastError()));

                return n;
            };

            auto pos = initial_pos;
            while (true)
            {
                auto n = do_read();

                /// If read to the end of file, return directly.
                if (n == 0)
                    return pos;

                /// Search for \n or \r\n or \n\r in buffer.
                int i = 0;
                while (i < n)
                {
                    if (buf[i] == '\n')
                    {
                        if (i + 1 < n)
                            return buf[i + 1] == '\r' ? pos + i + 2 : pos + i + 1;

                        /// read again if buffer is not enough.
                        auto m = do_read();
                        if (m == 0)
                            return pos + i + 1;

                        return buf[0] == '\r' ? pos + i + 2 : pos + i + 1;
                    }
                    else if (buf[i] == '\r')
                    {
                        if (i + 1 < n)
                            return buf[i + 1] == '\n' ? pos + i + 2 : pos + i + 1;

                        /// read again if buffer is not enough.
                        auto m = do_read();
                        if (m == 0)
                            return pos + i + 1;

                        return buf[0] == '\n' ? pos + i + 2 : pos + i + 1;
                    }
                    else
                        ++i;
                }

                /// Can't find \n or \r\n or \n\r in current buffer, read again.
                pos += n;
            }
        };

        std::pair<size_t, size_t> result;
        result.first = get_next_line_pos(fs.get(), fin, read_start_pos, hdfs_file_size);
        result.second = get_next_line_pos(fs.get(), fin, read_end_pos, hdfs_file_size);
        return result;
    }
};
#endif

#if USE_AWS_S3
class S3FileReadBufferBuilder : public ReadBufferBuilder
{
    friend void registerReadBufferBuilders();


public:
    explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_)
    {
        new_settings = context->getReadSettings();
        new_settings.enable_filesystem_cache = context->getConfigRef().getBool("s3.local_cache.enabled", false);

        if (new_settings.enable_filesystem_cache)
        {
            DB::FileCacheSettings file_cache_settings;
            file_cache_settings.max_size = static_cast<size_t>(context->getConfigRef().getUInt64("s3.local_cache.max_size", 100L << 30));
            auto cache_base_path = context->getConfigRef().getString("s3.local_cache.cache_path", "/tmp/gluten/local_cache");

            if (!fs::exists(cache_base_path))
                fs::create_directories(cache_base_path);

            file_cache_settings.base_path = cache_base_path;
            file_cache = DB::FileCacheFactory::instance().getOrCreate("s3_local_cache", file_cache_settings, "");
            file_cache->initialize();

            new_settings.remote_fs_cache = file_cache;
        }
    }

    ~S3FileReadBufferBuilder() override = default;

    std::unique_ptr<DB::ReadBuffer>
    build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
    {
        Poco::URI file_uri(file_info.uri_file());
        // file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet
        std::string bucket = file_uri.getHost();
        const auto client = getClient(bucket);
        std::string key = file_uri.getPath().substr(1);
        DB::S3::ObjectInfo object_info =  DB::S3::getObjectInfo(*client, bucket, key, "");
        size_t object_size = object_info.size;
        Int64 object_modified_time = object_info.last_modification_time;

        if (new_settings.enable_filesystem_cache)
        {

            auto file_cache_key = DB::FileCacheKey(key);
            auto last_cache_time = files_cache_time_map.get(file_cache_key);
            // quick check
            if (last_cache_time != std::nullopt && last_cache_time.has_value())
            {
                if (last_cache_time.value() < object_modified_time*1000l) //second to milli second
                {
                    files_cache_time_map.update_cache_time(file_cache_key, key, object_modified_time*1000l, file_cache);
                }
            }
            else
            {
                files_cache_time_map.update_cache_time(file_cache_key, key, object_modified_time*1000l, file_cache);
            }
        }

        auto read_buffer_creator
            = [bucket, client, this](const std::string & path, size_t read_until_position) -> std::unique_ptr<DB::ReadBufferFromFileBase>
        {
            return std::make_unique<DB::ReadBufferFromS3>(
                client,
                bucket,
                path,
                "",
                DB::S3Settings::RequestSettings(),
                new_settings,
                /* use_external_buffer */ true,
                /* offset */ 0,
                read_until_position,
                /* restricted_seek */ true);
        };

        DB::StoredObjects stored_objects{DB::StoredObject{key, "", object_size}};
        auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
            std::move(read_buffer_creator), stored_objects, new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);

        auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
        auto async_reader
            = std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), pool_reader, new_settings, nullptr, nullptr);

        if (set_read_util_position)
        {
            auto start_end_pos = adjustFileReadPosition(*async_reader, file_info.start(), file_info.start() + file_info.length());
            LOG_DEBUG(
                &Poco::Logger::get("ReadBufferBuilder"),
                "File read start and end position adjusted from {},{} to {},{}",
                file_info.start(),
                file_info.start() + file_info.length(),
                start_end_pos.first,
                start_end_pos.second);

            async_reader->seek(start_end_pos.first, SEEK_SET);
            async_reader->setReadUntilPosition(start_end_pos.second);
        }
        else
        {
            async_reader->setReadUntilEnd();
        }

        if (new_settings.remote_fs_prefetch)
            async_reader->prefetch(Priority{});

        return async_reader;
    }

private:
    static const std::string SHARED_CLIENT_KEY;
    static ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> per_bucket_clients;
    static FileCacheConcurrentMap files_cache_time_map;
    DB::ReadSettings new_settings;
    DB::FileCachePtr file_cache;

    std::string & stripQuote(std::string & s)
    {
        s.erase(remove(s.begin(), s.end(), '\''), s.end());
        return s;
    }

    std::string toBucketNameSetting(const std::string & bucket_name, const std::string & config_name)
    {
        if (!config_name.starts_with(BackendInitializerUtil::S3A_PREFIX))
        {
            // Currently per bucket only support fs.s3a.xxx
            return config_name;
        }
        // like: fs.s3a.bucket.bucket_name.assumed.role.externalId
        return BackendInitializerUtil::S3A_PREFIX + "bucket." + bucket_name + "."
            + config_name.substr(BackendInitializerUtil::S3A_PREFIX.size());
    }

    std::string getSetting(
        const DB::Settings & settings,
        const std::string & bucket_name,
        const std::string & config_name,
        const std::string & default_value = "",
        const bool require_per_bucket = false)
    {
        std::string ret;
        // if there's a bucket specific config, prefer it to non per bucket config
        if (settings.tryGetString(toBucketNameSetting(bucket_name, config_name), ret))
            return stripQuote(ret);

        if (!require_per_bucket && settings.tryGetString(config_name, ret))
            return stripQuote(ret);

        return default_value;
    }

    void cacheClient(const std::string & bucket_name, const bool is_per_bucket, std::shared_ptr<DB::S3::Client> client)
    {
        if (is_per_bucket)
        {
            per_bucket_clients.insert(bucket_name, client);
        }
        else
        {
            per_bucket_clients.insert(SHARED_CLIENT_KEY, client);
        }
    }

    std::shared_ptr<DB::S3::Client> getClient(const std::string & bucket_name)
    {
        const auto & config = context->getConfigRef();
        const auto & settings = context->getSettingsRef();
        bool use_assumed_role = false;
        bool is_per_bucket = false;

        if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE).empty())
            use_assumed_role = true;

        if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE, "", true).empty())
            is_per_bucket = true;

        if (is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
        {
            auto client = per_bucket_clients.get(bucket_name);
            if (client.has_value())
            {
                return client.get();
            }
        }

        if (!is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
        {
            auto client = per_bucket_clients.get(SHARED_CLIENT_KEY);
            if (client.has_value())
                return client.get();
        }

        String config_prefix = "s3";
        auto endpoint = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ENDPOINT, "https://s3.us-west-2.amazonaws.com");
        if (!endpoint.starts_with("https://"))
        {
            if (endpoint.starts_with("s3"))
                // as https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/bk_cloud-data-access/content/s3-config-parameters.html
                // the fs.s3a.endpoint does not contain https:// prefix
                endpoint = "https://" + endpoint;
            else
                throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Endpoint format not right: {}", endpoint);
        }
        String region_name;
        const char * amazon_suffix = ".amazonaws.com";
        const char * amazon_prefix = "https://s3.";
        auto pos = endpoint.find(amazon_suffix);
        if (pos != std::string::npos)
        {
            assert(endpoint.starts_with(amazon_prefix));
            region_name = endpoint.substr(strlen(amazon_prefix), pos - strlen(amazon_prefix));
            assert(region_name.find('.') == std::string::npos);
        }
        // for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work

        DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
            region_name,
            context->getRemoteHostFilter(),
            static_cast<unsigned>(context->getSettingsRef().s3_max_redirects),
            static_cast<unsigned>(context->getSettingsRef().s3_retry_attempts),
            false,
            false,
            nullptr,
            nullptr);

        client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
        client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
        client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
        client_configuration.endpointOverride = endpoint;

        client_configuration.retryStrategy
            = std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));

        std::string ak;
        std::string sk;
        settings.tryGetString(BackendInitializerUtil::HADOOP_S3_ACCESS_KEY, ak);
        settings.tryGetString(BackendInitializerUtil::HADOOP_S3_SECRET_KEY, sk);
        stripQuote(ak);
        stripQuote(sk);
        const DB::Settings & global_settings = context->getGlobalContext()->getSettingsRef();
        const DB::Settings & local_settings = context->getSettingsRef();
        DB::S3::ClientSettings client_settings{
            .use_virtual_addressing = false,
            .disable_checksum = local_settings.s3_disable_checksum,
            .gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
        };
        if (use_assumed_role)
        {
            auto new_client = DB::S3::ClientFactory::instance().create(
                client_configuration,
                client_settings,
                ak,     // access_key_id
                sk,     // secret_access_key
                "",     // server_side_encryption_customer_key_base64
                {},     // sse_kms_config
                {},     // headers
                DB::S3::CredentialsConfiguration{
                    .use_environment_credentials = true,
                    .use_insecure_imds_request = false,
                    .role_arn = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE),
                    .session_name = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
                    .external_id = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)
                });

            //TODO: support online change config for cached per_bucket_clients
            std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
            cacheClient(bucket_name, is_per_bucket, ret);
            return ret;
        }
        else
        {
            auto new_client = DB::S3::ClientFactory::instance().create(
                client_configuration,
                client_settings,
                ak,     // access_key_id
                sk,     // secret_access_key
                "",     // server_side_encryption_customer_key_base64
                {},     // sse_kms_config
                {},     // headers
                DB::S3::CredentialsConfiguration{
                    .use_environment_credentials = true,
                    .use_insecure_imds_request = false});

            std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
            cacheClient(bucket_name, is_per_bucket, ret);
            return ret;
        }
    }
};
const std::string S3FileReadBufferBuilder::SHARED_CLIENT_KEY = "___shared-client___";
ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> S3FileReadBufferBuilder::per_bucket_clients(100);
FileCacheConcurrentMap S3FileReadBufferBuilder::files_cache_time_map;

#endif

#if USE_AZURE_BLOB_STORAGE
class AzureBlobReadBuffer : public ReadBufferBuilder
{
public:
    explicit AzureBlobReadBuffer(DB::ContextPtr context_) : ReadBufferBuilder(context_) { }
    ~AzureBlobReadBuffer() override = default;

    std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool) override
    {
        Poco::URI file_uri(file_info.uri_file());
        std::unique_ptr<DB::ReadBuffer> read_buffer;
        read_buffer = std::make_unique<DB::ReadBufferFromAzureBlobStorage>(getClient(), file_uri.getPath(), DB::ReadSettings(), 5, 5);
        return read_buffer;
    }

private:
    std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> shared_client;

    std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> getClient()
    {
        if (shared_client)
            return shared_client;
        shared_client = DB::getAzureBlobContainerClient(context->getConfigRef(), "blob");
        return shared_client;
    }
};
#endif

void registerReadBufferBuilders()
{
    auto & factory = ReadBufferBuilderFactory::instance();
    factory.registerBuilder("file", [](DB::ContextPtr context_) { return std::make_shared<LocalFileReadBufferBuilder>(context_); });

#if USE_HDFS
    factory.registerBuilder("hdfs", [](DB::ContextPtr context_) { return std::make_shared<HDFSFileReadBufferBuilder>(context_); });
#endif

#if USE_AWS_S3
    factory.registerBuilder("s3", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); });
    factory.registerBuilder("s3a", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); });
    factory.registerCleaner([]() { S3FileReadBufferBuilder::per_bucket_clients.clear(); });
#endif

#if USE_AZURE_BLOB_STORAGE
    factory.registerBuilder("wasb", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); });
    factory.registerBuilder("wasbs", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); });
#endif
}

ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
{
    static ReadBufferBuilderFactory instance;
    return instance;
}

void ReadBufferBuilderFactory::registerBuilder(const String & schema, NewBuilder newer)
{
    auto it = builders.find(schema);
    if (it != builders.end())
        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "readbuffer builder for {} has been registered", schema);
    builders[schema] = newer;
}

ReadBufferBuilderPtr ReadBufferBuilderFactory::createBuilder(const String & schema, DB::ContextPtr context)
{
    auto it = builders.find(schema);
    if (it == builders.end())
        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Not found read buffer builder for {}", schema);
    return it->second(context);
}

void ReadBufferBuilderFactory::registerCleaner(Cleaner cleaner)
{
    cleaners.push_back(cleaner);
}

void ReadBufferBuilderFactory::clean()
{
    for (auto c : cleaners)
    {
        c();
    }
}
}
