cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp (596 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <IO/BoundedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3Common.h>
#include <IO/SeekableReadBuffer.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <sys/stat.h>
#include <Poco/URI.h>
#include "IO/ReadSettings.h"
#include <hdfs/hdfs.h>
#include <Poco/Logger.h>
#include <Common/FileCacheConcurrentMap.h>
#include <Common/Throttler.h>
#include <Common/logger_useful.h>
#include <Common/safe_cast.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <Common/CHUtil.h>
#include <shared_mutex>
#include <thread>
#include <boost/compute/detail/lru_cache.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_OPEN_FILE;
extern const int UNKNOWN_FILE_SIZE;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
}
}
namespace local_engine
{
template <class key_type, class value_type>
class ConcurrentLRU
{
public:
ConcurrentLRU(size_t size) : cache(size) { }
boost::optional<value_type> get(const key_type & key)
{
std::shared_lock<std::shared_mutex> lock(rwLock);
return cache.get(key);
}
void insert(const key_type & key, const value_type & value)
{
std::unique_lock<std::shared_mutex> lock(rwLock);
cache.insert(key, value);
}
void clear()
{
std::unique_lock<std::shared_mutex> lock(rwLock);
cache.clear();
}
private:
boost::compute::detail::lru_cache<key_type, value_type> cache;
std::shared_mutex rwLock;
};
std::pair<size_t, size_t> adjustFileReadPosition(DB::ReadBufferFromFileBase & buffer, size_t read_start_pos, size_t read_end_pos)
{
auto get_next_line_pos = [&](DB::ReadBufferFromFileBase & buf) -> size_t
{
while (!buf.eof())
{
if (*buf.position() == '\r')
{
++buf.position();
if (!buf.eof() && *buf.position() == '\n')
{
++buf.position();
}
return buf.getPosition();
}
else if (*buf.position() == '\n')
{
++buf.position();
return buf.getPosition();
}
++buf.position();
}
return buf.getPosition();
};
std::pair<size_t, size_t> result;
if (read_start_pos == 0)
result.first = read_start_pos;
else
{
buffer.seek(read_start_pos, SEEK_SET);
result.first = get_next_line_pos(buffer);
}
if (read_end_pos == 0)
result.second = read_end_pos;
else
{
buffer.seek(read_end_pos, SEEK_SET);
result.second = get_next_line_pos(buffer);
}
return result;
}
class LocalFileReadBufferBuilder : public ReadBufferBuilder
{
public:
explicit LocalFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) { }
~LocalFileReadBufferBuilder() override = default;
std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
{
Poco::URI file_uri(file_info.uri_file());
std::unique_ptr<DB::ReadBufferFromFileBase> read_buffer;
const String & file_path = file_uri.getPath();
struct stat file_stat;
if (stat(file_path.c_str(), &file_stat))
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "file stat failed for {}", file_path);
if (S_ISREG(file_stat.st_mode))
read_buffer = std::make_unique<DB::ReadBufferFromFilePRead>(file_path);
else
read_buffer = std::make_unique<DB::ReadBufferFromFile>(file_path);
if (set_read_util_position)
{
auto * work_around = read_buffer.get();
read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
// workaround for https://github.com/ClickHouse/ClickHouse/pull/58886
// ReadBufferFromFileDecorator construtor will call swap, without wrap, BoundedReadBuffer can't work.
read_buffer->swap(*work_around);
auto start_end_pos = adjustFileReadPosition(*read_buffer, file_info.start(), file_info.start() + file_info.length());
LOG_DEBUG(
&Poco::Logger::get("ReadBufferBuilder"),
"File read start and end position adjusted from {},{} to {},{}",
file_info.start(),
file_info.start() + file_info.length(),
start_end_pos.first,
start_end_pos.second);
read_buffer->seek(start_end_pos.first, SEEK_SET);
read_buffer->setReadUntilPosition(start_end_pos.second);
}
return read_buffer;
}
};
#if USE_HDFS
class HDFSFileReadBufferBuilder : public ReadBufferBuilder
{
public:
explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) { }
~HDFSFileReadBufferBuilder() override = default;
std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
{
Poco::URI file_uri(file_info.uri_file());
std::string uri_path = "hdfs://" + file_uri.getHost();
if (file_uri.getPort())
uri_path += ":" + std::to_string(file_uri.getPort());
DB::ReadSettings read_settings;
std::unique_ptr<DB::ReadBuffer> read_buffer;
if (set_read_util_position)
{
std::pair<size_t, size_t> start_end_pos
= adjustFileReadStartAndEndPos(file_info.start(), file_info.start() + file_info.length(), uri_path, file_uri.getPath());
LOG_DEBUG(
&Poco::Logger::get("ReadBufferBuilder"),
"File read start and end position adjusted from {},{} to {},{}",
file_info.start(),
file_info.start() + file_info.length(),
start_end_pos.first,
start_end_pos.second);
read_buffer = std::make_unique<DB::ReadBufferFromHDFS>(
uri_path, file_uri.getPath(), context->getConfigRef(), read_settings, start_end_pos.second);
if (auto * seekable_in = dynamic_cast<DB::SeekableReadBuffer *>(read_buffer.get()))
if (start_end_pos.first)
seekable_in->seek(start_end_pos.first, SEEK_SET);
}
else
{
read_buffer = std::make_unique<DB::ReadBufferFromHDFS>(uri_path, file_uri.getPath(), context->getConfigRef(), read_settings);
}
return read_buffer;
}
std::pair<size_t, size_t>
adjustFileReadStartAndEndPos(size_t read_start_pos, size_t read_end_pos, const std::string & uri_path, const std::string & file_path)
{
auto builder = DB::createHDFSBuilder(uri_path, context->getConfigRef());
auto fs = DB::createHDFSFS(builder.get());
hdfsFile fin = hdfsOpenFile(fs.get(), file_path.c_str(), O_RDONLY, 0, 0, 0);
std::string hdfs_file_path = uri_path + file_path;
if (!fin)
throw DB::Exception(
DB::ErrorCodes::CANNOT_OPEN_FILE, "Cannot open hdfs file:{}, error: {}", hdfs_file_path, std::string(hdfsGetLastError()));
/// Always close hdfs file before exit function.
SCOPE_EXIT({ hdfsCloseFile(fs.get(), fin); });
auto hdfs_file_info = hdfsGetPathInfo(fs.get(), file_path.c_str());
if (!hdfs_file_info)
throw DB::Exception(
DB::ErrorCodes::UNKNOWN_FILE_SIZE,
"Cannot find out file size for :{}, error: {}",
hdfs_file_path,
std::string(hdfsGetLastError()));
size_t hdfs_file_size = hdfs_file_info->mSize;
/// initial_pos maybe in the middle of a row, so we need to find the next row start position.
auto get_next_line_pos = [&](hdfsFS hdfs_fs, hdfsFile file, size_t initial_pos, size_t file_size) -> size_t
{
if (initial_pos == 0 || initial_pos == file_size)
return initial_pos;
int seek_ret = hdfsSeek(hdfs_fs, file, initial_pos);
if (seek_ret < 0)
throw DB::Exception(
DB::ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
"Fail to seek HDFS file: {}, error: {}",
file_path,
std::string(hdfsGetLastError()));
static constexpr size_t buf_size = 1024;
char buf[buf_size];
auto do_read = [&]() -> int
{
auto n = hdfsRead(hdfs_fs, file, buf, buf_size);
if (n < 0)
throw DB::Exception(
DB::ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR,
"Fail to read HDFS file: {}, error: {}",
file_path,
std::string(hdfsGetLastError()));
return n;
};
auto pos = initial_pos;
while (true)
{
auto n = do_read();
/// If read to the end of file, return directly.
if (n == 0)
return pos;
/// Search for \n or \r\n or \n\r in buffer.
int i = 0;
while (i < n)
{
if (buf[i] == '\n')
{
if (i + 1 < n)
return buf[i + 1] == '\r' ? pos + i + 2 : pos + i + 1;
/// read again if buffer is not enough.
auto m = do_read();
if (m == 0)
return pos + i + 1;
return buf[0] == '\r' ? pos + i + 2 : pos + i + 1;
}
else if (buf[i] == '\r')
{
if (i + 1 < n)
return buf[i + 1] == '\n' ? pos + i + 2 : pos + i + 1;
/// read again if buffer is not enough.
auto m = do_read();
if (m == 0)
return pos + i + 1;
return buf[0] == '\n' ? pos + i + 2 : pos + i + 1;
}
else
++i;
}
/// Can't find \n or \r\n or \n\r in current buffer, read again.
pos += n;
}
};
std::pair<size_t, size_t> result;
result.first = get_next_line_pos(fs.get(), fin, read_start_pos, hdfs_file_size);
result.second = get_next_line_pos(fs.get(), fin, read_end_pos, hdfs_file_size);
return result;
}
};
#endif
#if USE_AWS_S3
class S3FileReadBufferBuilder : public ReadBufferBuilder
{
friend void registerReadBufferBuilders();
public:
explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_)
{
new_settings = context->getReadSettings();
new_settings.enable_filesystem_cache = context->getConfigRef().getBool("s3.local_cache.enabled", false);
if (new_settings.enable_filesystem_cache)
{
DB::FileCacheSettings file_cache_settings;
file_cache_settings.max_size = static_cast<size_t>(context->getConfigRef().getUInt64("s3.local_cache.max_size", 100L << 30));
auto cache_base_path = context->getConfigRef().getString("s3.local_cache.cache_path", "/tmp/gluten/local_cache");
if (!fs::exists(cache_base_path))
fs::create_directories(cache_base_path);
file_cache_settings.base_path = cache_base_path;
file_cache = DB::FileCacheFactory::instance().getOrCreate("s3_local_cache", file_cache_settings, "");
file_cache->initialize();
new_settings.remote_fs_cache = file_cache;
}
}
~S3FileReadBufferBuilder() override = default;
std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
{
Poco::URI file_uri(file_info.uri_file());
// file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet
std::string bucket = file_uri.getHost();
const auto client = getClient(bucket);
std::string key = file_uri.getPath().substr(1);
DB::S3::ObjectInfo object_info = DB::S3::getObjectInfo(*client, bucket, key, "");
size_t object_size = object_info.size;
Int64 object_modified_time = object_info.last_modification_time;
if (new_settings.enable_filesystem_cache)
{
auto file_cache_key = DB::FileCacheKey(key);
auto last_cache_time = files_cache_time_map.get(file_cache_key);
// quick check
if (last_cache_time != std::nullopt && last_cache_time.has_value())
{
if (last_cache_time.value() < object_modified_time*1000l) //second to milli second
{
files_cache_time_map.update_cache_time(file_cache_key, key, object_modified_time*1000l, file_cache);
}
}
else
{
files_cache_time_map.update_cache_time(file_cache_key, key, object_modified_time*1000l, file_cache);
}
}
auto read_buffer_creator
= [bucket, client, this](const std::string & path, size_t read_until_position) -> std::unique_ptr<DB::ReadBufferFromFileBase>
{
return std::make_unique<DB::ReadBufferFromS3>(
client,
bucket,
path,
"",
DB::S3Settings::RequestSettings(),
new_settings,
/* use_external_buffer */ true,
/* offset */ 0,
read_until_position,
/* restricted_seek */ true);
};
DB::StoredObjects stored_objects{DB::StoredObject{key, "", object_size}};
auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), stored_objects, new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);
auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader
= std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), pool_reader, new_settings, nullptr, nullptr);
if (set_read_util_position)
{
auto start_end_pos = adjustFileReadPosition(*async_reader, file_info.start(), file_info.start() + file_info.length());
LOG_DEBUG(
&Poco::Logger::get("ReadBufferBuilder"),
"File read start and end position adjusted from {},{} to {},{}",
file_info.start(),
file_info.start() + file_info.length(),
start_end_pos.first,
start_end_pos.second);
async_reader->seek(start_end_pos.first, SEEK_SET);
async_reader->setReadUntilPosition(start_end_pos.second);
}
else
{
async_reader->setReadUntilEnd();
}
if (new_settings.remote_fs_prefetch)
async_reader->prefetch(Priority{});
return async_reader;
}
private:
static const std::string SHARED_CLIENT_KEY;
static ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> per_bucket_clients;
static FileCacheConcurrentMap files_cache_time_map;
DB::ReadSettings new_settings;
DB::FileCachePtr file_cache;
std::string & stripQuote(std::string & s)
{
s.erase(remove(s.begin(), s.end(), '\''), s.end());
return s;
}
std::string toBucketNameSetting(const std::string & bucket_name, const std::string & config_name)
{
if (!config_name.starts_with(BackendInitializerUtil::S3A_PREFIX))
{
// Currently per bucket only support fs.s3a.xxx
return config_name;
}
// like: fs.s3a.bucket.bucket_name.assumed.role.externalId
return BackendInitializerUtil::S3A_PREFIX + "bucket." + bucket_name + "."
+ config_name.substr(BackendInitializerUtil::S3A_PREFIX.size());
}
std::string getSetting(
const DB::Settings & settings,
const std::string & bucket_name,
const std::string & config_name,
const std::string & default_value = "",
const bool require_per_bucket = false)
{
std::string ret;
// if there's a bucket specific config, prefer it to non per bucket config
if (settings.tryGetString(toBucketNameSetting(bucket_name, config_name), ret))
return stripQuote(ret);
if (!require_per_bucket && settings.tryGetString(config_name, ret))
return stripQuote(ret);
return default_value;
}
void cacheClient(const std::string & bucket_name, const bool is_per_bucket, std::shared_ptr<DB::S3::Client> client)
{
if (is_per_bucket)
{
per_bucket_clients.insert(bucket_name, client);
}
else
{
per_bucket_clients.insert(SHARED_CLIENT_KEY, client);
}
}
std::shared_ptr<DB::S3::Client> getClient(const std::string & bucket_name)
{
const auto & config = context->getConfigRef();
const auto & settings = context->getSettingsRef();
bool use_assumed_role = false;
bool is_per_bucket = false;
if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE).empty())
use_assumed_role = true;
if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE, "", true).empty())
is_per_bucket = true;
if (is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
{
auto client = per_bucket_clients.get(bucket_name);
if (client.has_value())
{
return client.get();
}
}
if (!is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
{
auto client = per_bucket_clients.get(SHARED_CLIENT_KEY);
if (client.has_value())
return client.get();
}
String config_prefix = "s3";
auto endpoint = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ENDPOINT, "https://s3.us-west-2.amazonaws.com");
if (!endpoint.starts_with("https://"))
{
if (endpoint.starts_with("s3"))
// as https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/bk_cloud-data-access/content/s3-config-parameters.html
// the fs.s3a.endpoint does not contain https:// prefix
endpoint = "https://" + endpoint;
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Endpoint format not right: {}", endpoint);
}
String region_name;
const char * amazon_suffix = ".amazonaws.com";
const char * amazon_prefix = "https://s3.";
auto pos = endpoint.find(amazon_suffix);
if (pos != std::string::npos)
{
assert(endpoint.starts_with(amazon_prefix));
region_name = endpoint.substr(strlen(amazon_prefix), pos - strlen(amazon_prefix));
assert(region_name.find('.') == std::string::npos);
}
// for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work
DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
region_name,
context->getRemoteHostFilter(),
static_cast<unsigned>(context->getSettingsRef().s3_max_redirects),
static_cast<unsigned>(context->getSettingsRef().s3_retry_attempts),
false,
false,
nullptr,
nullptr);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = endpoint;
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));
std::string ak;
std::string sk;
settings.tryGetString(BackendInitializerUtil::HADOOP_S3_ACCESS_KEY, ak);
settings.tryGetString(BackendInitializerUtil::HADOOP_S3_SECRET_KEY, sk);
stripQuote(ak);
stripQuote(sk);
const DB::Settings & global_settings = context->getGlobalContext()->getSettingsRef();
const DB::Settings & local_settings = context->getSettingsRef();
DB::S3::ClientSettings client_settings{
.use_virtual_addressing = false,
.disable_checksum = local_settings.s3_disable_checksum,
.gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
};
if (use_assumed_role)
{
auto new_client = DB::S3::ClientFactory::instance().create(
client_configuration,
client_settings,
ak, // access_key_id
sk, // secret_access_key
"", // server_side_encryption_customer_key_base64
{}, // sse_kms_config
{}, // headers
DB::S3::CredentialsConfiguration{
.use_environment_credentials = true,
.use_insecure_imds_request = false,
.role_arn = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE),
.session_name = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
.external_id = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)
});
//TODO: support online change config for cached per_bucket_clients
std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
cacheClient(bucket_name, is_per_bucket, ret);
return ret;
}
else
{
auto new_client = DB::S3::ClientFactory::instance().create(
client_configuration,
client_settings,
ak, // access_key_id
sk, // secret_access_key
"", // server_side_encryption_customer_key_base64
{}, // sse_kms_config
{}, // headers
DB::S3::CredentialsConfiguration{
.use_environment_credentials = true,
.use_insecure_imds_request = false});
std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
cacheClient(bucket_name, is_per_bucket, ret);
return ret;
}
}
};
const std::string S3FileReadBufferBuilder::SHARED_CLIENT_KEY = "___shared-client___";
ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> S3FileReadBufferBuilder::per_bucket_clients(100);
FileCacheConcurrentMap S3FileReadBufferBuilder::files_cache_time_map;
#endif
#if USE_AZURE_BLOB_STORAGE
class AzureBlobReadBuffer : public ReadBufferBuilder
{
public:
explicit AzureBlobReadBuffer(DB::ContextPtr context_) : ReadBufferBuilder(context_) { }
~AzureBlobReadBuffer() override = default;
std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool) override
{
Poco::URI file_uri(file_info.uri_file());
std::unique_ptr<DB::ReadBuffer> read_buffer;
read_buffer = std::make_unique<DB::ReadBufferFromAzureBlobStorage>(getClient(), file_uri.getPath(), DB::ReadSettings(), 5, 5);
return read_buffer;
}
private:
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> shared_client;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> getClient()
{
if (shared_client)
return shared_client;
shared_client = DB::getAzureBlobContainerClient(context->getConfigRef(), "blob");
return shared_client;
}
};
#endif
void registerReadBufferBuilders()
{
auto & factory = ReadBufferBuilderFactory::instance();
factory.registerBuilder("file", [](DB::ContextPtr context_) { return std::make_shared<LocalFileReadBufferBuilder>(context_); });
#if USE_HDFS
factory.registerBuilder("hdfs", [](DB::ContextPtr context_) { return std::make_shared<HDFSFileReadBufferBuilder>(context_); });
#endif
#if USE_AWS_S3
factory.registerBuilder("s3", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); });
factory.registerBuilder("s3a", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); });
factory.registerCleaner([]() { S3FileReadBufferBuilder::per_bucket_clients.clear(); });
#endif
#if USE_AZURE_BLOB_STORAGE
factory.registerBuilder("wasb", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); });
factory.registerBuilder("wasbs", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); });
#endif
}
ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
{
static ReadBufferBuilderFactory instance;
return instance;
}
void ReadBufferBuilderFactory::registerBuilder(const String & schema, NewBuilder newer)
{
auto it = builders.find(schema);
if (it != builders.end())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "readbuffer builder for {} has been registered", schema);
builders[schema] = newer;
}
ReadBufferBuilderPtr ReadBufferBuilderFactory::createBuilder(const String & schema, DB::ContextPtr context)
{
auto it = builders.find(schema);
if (it == builders.end())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Not found read buffer builder for {}", schema);
return it->second(context);
}
void ReadBufferBuilderFactory::registerCleaner(Cleaner cleaner)
{
cleaners.push_back(cleaner);
}
void ReadBufferBuilderFactory::clean()
{
for (auto c : cleaners)
{
c();
}
}
}