cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp (772 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include <memory>
#include <shared_mutex>
#include <Core/Settings.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <IO/BoundedReadBuffer.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadSettings.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3Common.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/SharedThreadPools.h>
#include <IO/SplittableBzip2ReadBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/Context.h>
#include <Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h>
#include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/compute/detail/lru_cache.hpp>
#include <sys/stat.h>
#include <Poco/Logger.h>
#include <Poco/URI.h>
#include <Common/CHUtil.h>
#include <Common/GlutenConfig.h>
#include <Common/GlutenSettings.h>
#include <Common/logger_useful.h>
#include <Common/safe_cast.h>
#if USE_AZURE_BLOB_STORAGE
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h>
#endif
#if USE_AWS_S3
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#endif
#if USE_HDFS
#include <hdfs/hdfs.h>
#endif
namespace DB
{
namespace Setting
{
extern const SettingsUInt64 s3_max_redirects;
extern const SettingsBool s3_disable_checksum;
extern const SettingsUInt64 s3_retry_attempts;
extern const SettingsMaxThreads max_download_threads;
extern const SettingsUInt64 max_download_buffer_size;
extern const SettingsBool input_format_allow_seeks;
extern const SettingsUInt64 max_read_buffer_size;
}
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_OPEN_FILE;
extern const int UNKNOWN_FILE_SIZE;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
}
namespace FileCacheSetting
{
extern const FileCacheSettingsUInt64 max_size;
extern const FileCacheSettingsString path;
}
}
namespace local_engine
{
using namespace DB;
FileCacheConcurrentMap ReadBufferBuilder::files_cache_time_map;
template <class key_type, class value_type>
class ConcurrentLRU
{
public:
explicit ConcurrentLRU(size_t size) : cache(size) { }
boost::optional<value_type> get(const key_type & key)
{
std::shared_lock<std::shared_mutex> lock(rwLock);
return cache.get(key);
}
void insert(const key_type & key, const value_type & value)
{
std::unique_lock<std::shared_mutex> lock(rwLock);
cache.insert(key, value);
}
void clear()
{
std::unique_lock<std::shared_mutex> lock(rwLock);
cache.clear();
}
private:
boost::compute::detail::lru_cache<key_type, value_type> cache;
std::shared_mutex rwLock;
};
static std::pair<size_t, size_t> getAdjustedReadRange(SeekableReadBuffer & buffer, const std::pair<size_t, size_t> & start_end)
{
auto get_next_line_pos = [&](SeekableReadBuffer & buf) -> size_t
{
while (!buf.eof())
{
/// Search for \n or \r\n or \n\r or \r in buffer.
if (*buf.position() == '\r')
{
++buf.position();
if (!buf.eof() && *buf.position() == '\n')
{
++buf.position();
}
return buf.getPosition();
}
else if (*buf.position() == '\n')
{
++buf.position();
if (!buf.eof() && *buf.position() == '\r')
{
++buf.position();
}
return buf.getPosition();
}
++buf.position();
}
return buf.getPosition();
};
size_t read_start_pos = start_end.first;
size_t read_end_pos = start_end.second;
std::pair<size_t, size_t> result;
if (read_start_pos == 0)
result.first = read_start_pos;
else
{
buffer.seek(read_start_pos, SEEK_SET);
result.first = get_next_line_pos(buffer);
}
if (read_end_pos == 0)
result.second = read_end_pos;
else
{
buffer.seek(read_end_pos, SEEK_SET);
result.second = get_next_line_pos(buffer);
}
return result;
}
static std::unique_ptr<SeekableReadBuffer>
adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer> read_buffer, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
{
/// Skip formats in which rows are not seperated by newline characters.
if (!(file_info.has_text() || file_info.has_json()))
return std::move(read_buffer);
/// Skip text/json files with compression.
/// When the file is compressed, its read range is adjusted in [[buildWithCompressionWrapper]]
Poco::URI file_uri(file_info.uri_file());
DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto");
if (compression != CompressionMethod::None)
return std::move(read_buffer);
std::pair<size_t, size_t> start_end{file_info.start(), file_info.start() + file_info.length()};
start_end = getAdjustedReadRange(*read_buffer, start_end);
LOG_DEBUG(
&Poco::Logger::get("ReadBufferBuilder"),
"File read start and end position adjusted from {},{} to {},{}",
file_info.start(),
file_info.start() + file_info.length(),
start_end.first,
start_end.second);
#if USE_HDFS
/// If read buffer doesn't support right bounded reads, wrap it with BoundedReadBuffer to enable right bounded reads.
if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) || dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(read_buffer.get())
|| dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
#else
if (dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
#endif
read_buffer->seek(start_end.first, SEEK_SET);
read_buffer->setReadUntilPosition(start_end.second);
return std::move(read_buffer);
}
class LocalFileReadBufferBuilder : public ReadBufferBuilder
{
public:
explicit LocalFileReadBufferBuilder(const DB::ContextPtr & context_) : ReadBufferBuilder(context_) { }
~LocalFileReadBufferBuilder() override = default;
bool isRemote() const override { return false; }
std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
{
Poco::URI file_uri(file_info.uri_file());
std::unique_ptr<DB::ReadBufferFromFileBase> read_buffer;
const String & file_path = file_uri.getPath();
struct stat file_stat;
if (stat(file_path.c_str(), &file_stat))
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "file stat failed for {}", file_path);
if (S_ISREG(file_stat.st_mode))
read_buffer = std::make_unique<DB::ReadBufferFromFilePRead>(file_path);
else
read_buffer = std::make_unique<DB::ReadBufferFromFile>(file_path);
return adjustReadRangeIfNeeded(std::move(read_buffer), file_info);
}
};
#if USE_HDFS
class HDFSFileReadBufferBuilder : public ReadBufferBuilder
{
using ReadBufferCreator
= std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek, const DB::StoredObject & object)>;
public:
explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_), context(context_) { }
~HDFSFileReadBufferBuilder() override = default;
std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
{
DB::ReadSettings read_settings = getReadSettings();
const auto & config = context->getConfigRef();
/// Get hdfs_uri
Poco::URI uri(file_info.uri_file());
auto hdfs_file_path = uri.getPath();
std::string new_file_uri = uri.toString();
if (uri.getUserInfo().empty() && BackendInitializerUtil::spark_user.has_value())
{
uri.setUserInfo(*BackendInitializerUtil::spark_user);
new_file_uri = uri.toString();
}
auto begin_of_path = new_file_uri.find('/', new_file_uri.find("//") + 2);
auto hdfs_uri = new_file_uri.substr(0, begin_of_path);
std::optional<size_t> file_size;
std::optional<size_t> modified_time;
if (file_info.has_properties())
{
if (file_info.properties().filesize() > 0)
{
/// filesize may be zero, under such condition we should not set file_size
file_size = file_info.properties().filesize();
}
modified_time = file_info.properties().modificationtime();
}
std::unique_ptr<SeekableReadBuffer> read_buffer;
if (!read_settings.enable_filesystem_cache)
{
bool thread_pool_read = read_settings.remote_fs_method == DB::RemoteFSReadMethod::threadpool;
/// ORC and Parquet reader had already implemented async prefetch. They don't rely on AsynchronousReadBufferFromHDFS
bool use_async_prefetch
= read_settings.remote_fs_prefetch && thread_pool_read && (file_info.has_text() || file_info.has_json());
auto raw_read_buffer = std::make_unique<ReadBufferFromHDFS>(
hdfs_uri,
hdfs_file_path,
config,
read_settings,
/* read_until_position */ 0,
/* use_external_buffer */ false,
file_size);
if (use_async_prefetch)
read_buffer = std::make_unique<AsynchronousReadBufferFromHDFS>(
getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, std::move(raw_read_buffer));
else
read_buffer = std::move(raw_read_buffer);
}
else
{
if (!file_size.has_value())
{
// only for spark3.2 file partition not contained file size
// so first compute file size first
auto tmp_read_buffer = std::make_unique<DB::ReadBufferFromHDFS>(
hdfs_uri,
hdfs_file_path,
config,
read_settings,
/* read_until_position */ 0);
file_size = tmp_read_buffer->getFileSize();
}
if (!modified_time.has_value())
modified_time = 0;
ReadBufferCreator read_buffer_creator
= [this, hdfs_uri = hdfs_uri, hdfs_file_path = hdfs_file_path, read_settings, &config](
bool /* restricted_seek */, const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromHDFS>
{
return std::make_unique<DB::ReadBufferFromHDFS>(
hdfs_uri, hdfs_file_path, config, read_settings, 0, true, object.bytes_size);
};
auto remote_path = uri.getPath().substr(1);
DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "", *file_size}};
auto cache_creator = wrapWithCache(read_buffer_creator, read_settings, remote_path, *modified_time, *file_size);
size_t buffer_size = std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
if (*file_size > 0)
buffer_size = std::min(*file_size, buffer_size);
auto cache_hdfs_read = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
std::move(cache_creator), stored_objects, read_settings, /* use_external_buffer */ false, buffer_size);
read_buffer = std::move(cache_hdfs_read);
}
return adjustReadRangeIfNeeded(std::move(read_buffer), file_info);
}
private:
DB::ContextPtr context;
};
#endif
#if USE_AWS_S3
class S3FileReadBufferBuilder : public ReadBufferBuilder
{
friend void registerReadBufferBuilders();
public:
explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_)
{
auto config = S3Config::loadFromContext(context);
// use gluten cache config is first priority
if (!file_cache && config.s3_local_cache_enabled)
{
DB::FileCacheSettings file_cache_settings;
file_cache_settings[FileCacheSetting::max_size] = config.s3_local_cache_max_size;
auto cache_base_path = config.s3_local_cache_cache_path;
if (!std::filesystem::exists(cache_base_path))
std::filesystem::create_directories(cache_base_path);
file_cache_settings[FileCacheSetting::path] = cache_base_path;
file_cache = DB::FileCacheFactory::instance().getOrCreate("s3_local_cache", file_cache_settings, "");
file_cache->initialize();
}
}
~S3FileReadBufferBuilder() override = default;
std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
{
DB::ReadSettings read_settings = getReadSettings();
Poco::URI file_uri(file_info.uri_file());
// file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet
const std::string & bucket = file_uri.getHost();
const auto client = getClient(bucket);
std::string pathKey = file_uri.getPath().substr(1);
size_t object_size = 0;
size_t object_modified_time = 0;
if (file_info.has_properties())
{
object_size = file_info.properties().filesize();
object_modified_time = file_info.properties().modificationtime();
}
else
{
DB::S3::ObjectInfo object_info = DB::S3::getObjectInfo(*client, bucket, pathKey, "");
object_size = object_info.size;
object_modified_time = object_info.last_modification_time;
}
auto read_buffer_creator = [bucket, client, read_settings, this](
bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
{
return std::make_unique<DB::ReadBufferFromS3>(
client,
bucket,
object.remote_path,
"",
DB::S3::S3RequestSettings(),
read_settings,
/* use_external_buffer */ true,
/* offset */ 0,
/* read_until_position */ 0,
restricted_seek);
};
auto cache_creator = wrapWithCache(read_buffer_creator, read_settings, pathKey, object_modified_time, object_size);
DB::StoredObjects stored_objects{DB::StoredObject{pathKey, "", object_size}};
auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
std::move(cache_creator), stored_objects, read_settings, /* use_external_buffer */ true, 0);
auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
size_t buffer_size = std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
if (object_size > 0)
buffer_size = std::min(object_size, buffer_size);
auto async_reader = std::make_unique<DB::AsynchronousBoundedReadBuffer>(
std::move(s3_impl), pool_reader, read_settings, buffer_size, read_settings.remote_read_min_bytes_for_seek);
if (read_settings.remote_fs_prefetch)
async_reader->prefetch(Priority{});
return adjustReadRangeIfNeeded(std::move(async_reader), file_info);
}
private:
static const std::string SHARED_CLIENT_KEY;
static ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> per_bucket_clients;
static std::string toBucketNameSetting(const std::string & bucket_name, const std::string & config_name)
{
if (!config_name.starts_with(BackendInitializerUtil::S3A_PREFIX))
{
// Currently per bucket only support fs.s3a.xxx
return config_name;
}
// like: fs.s3a.bucket.bucket_name.assumed.role.externalId
return BackendInitializerUtil::S3A_PREFIX + "bucket." + bucket_name + "."
+ config_name.substr(BackendInitializerUtil::S3A_PREFIX.size());
}
static std::string getSetting(
const DB::Settings & settings,
const std::string & bucket_name,
const std::string & config_name,
const std::string & default_value = "",
const bool require_per_bucket = false)
{
std::string ret;
// if there's a bucket specific config, prefer it to non per bucket config
if (tryGetString(settings, toBucketNameSetting(bucket_name, config_name), ret))
return ret;
if (!require_per_bucket && tryGetString(settings, config_name, ret))
return ret;
return default_value;
}
void cacheClient(const std::string & bucket_name, const bool is_per_bucket, std::shared_ptr<DB::S3::Client> client)
{
if (is_per_bucket)
{
per_bucket_clients.insert(bucket_name, client);
}
else
{
per_bucket_clients.insert(SHARED_CLIENT_KEY, client);
}
}
std::shared_ptr<DB::S3::Client> getClient(const std::string & bucket_name)
{
const auto & config = context->getConfigRef();
const auto & settings = context->getSettingsRef();
bool use_assumed_role = false;
bool is_per_bucket = false;
if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE).empty())
use_assumed_role = true;
if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE, "", true).empty())
is_per_bucket = true;
if (is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
{
auto client = per_bucket_clients.get(bucket_name);
if (client.has_value())
{
return client.get();
}
}
if (!is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE))
{
auto client = per_bucket_clients.get(SHARED_CLIENT_KEY);
if (client.has_value())
return client.get();
}
String config_prefix = "s3";
auto endpoint = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ENDPOINT, "https://s3.us-west-2.amazonaws.com");
bool end_point_start_with_http_or_https = endpoint.starts_with("https://") || endpoint.starts_with("http://");
if (!end_point_start_with_http_or_https)
{
if (endpoint.starts_with("s3"))
// as https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/bk_cloud-data-access/content/s3-config-parameters.html
// the fs.s3a.endpoint does not contain https:// prefix
endpoint = "https://" + endpoint;
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Endpoint format not right: {}", endpoint);
}
String region_name;
const char * amazon_suffix = ".amazonaws.com";
const char * amazon_prefix = "https://s3.";
auto pos = endpoint.find(amazon_suffix);
if (pos != std::string::npos)
{
assert(endpoint.starts_with(amazon_prefix));
region_name = endpoint.substr(strlen(amazon_prefix), pos - strlen(amazon_prefix));
assert(region_name.find('.') == std::string::npos);
}
// for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work
DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
region_name,
context->getRemoteHostFilter(),
static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_max_redirects]),
static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_retry_attempts]),
false,
false,
nullptr,
nullptr);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = endpoint;
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));
std::string ak;
std::string sk;
std::string path_style_access;
bool addressing_type = false;
tryGetString(settings, BackendInitializerUtil::HADOOP_S3_ACCESS_KEY, ak);
tryGetString(settings, BackendInitializerUtil::HADOOP_S3_SECRET_KEY, sk);
tryGetString(settings, BackendInitializerUtil::HADOOP_S3_PATH_STYLE_ACCESS, path_style_access);
const DB::Settings & global_settings = context->getGlobalContext()->getSettingsRef();
const DB::Settings & local_settings = context->getSettingsRef();
boost::algorithm::to_lower(path_style_access);
addressing_type = (path_style_access == "false");
DB::S3::ClientSettings client_settings{
.use_virtual_addressing = addressing_type,
.disable_checksum = local_settings[DB::Setting::s3_disable_checksum],
.gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
};
if (use_assumed_role)
{
auto new_client = DB::S3::ClientFactory::instance().create(
client_configuration,
client_settings,
ak, // access_key_id
sk, // secret_access_key
"", // server_side_encryption_customer_key_base64
{}, // sse_kms_config
{}, // headers
DB::S3::CredentialsConfiguration{
.use_environment_credentials = true,
.use_insecure_imds_request = false,
.role_arn = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE),
.session_name = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
.external_id = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)});
//TODO: support online change config for cached per_bucket_clients
std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
cacheClient(bucket_name, is_per_bucket, ret);
return ret;
}
else
{
auto new_client = DB::S3::ClientFactory::instance().create(
client_configuration,
client_settings,
ak, // access_key_id
sk, // secret_access_key
"", // server_side_encryption_customer_key_base64
{}, // sse_kms_config
{}, // headers
DB::S3::CredentialsConfiguration{.use_environment_credentials = true, .use_insecure_imds_request = false});
std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
cacheClient(bucket_name, is_per_bucket, ret);
return ret;
}
}
};
const std::string S3FileReadBufferBuilder::SHARED_CLIENT_KEY = "___shared-client___";
ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> S3FileReadBufferBuilder::per_bucket_clients(100);
#endif
#if USE_AZURE_BLOB_STORAGE
class AzureBlobReadBuffer : public ReadBufferBuilder
{
public:
explicit AzureBlobReadBuffer(const DB::ContextPtr & context_) : ReadBufferBuilder(context_) { }
~AzureBlobReadBuffer() override = default;
std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
{
Poco::URI file_uri(file_info.uri_file());
return std::make_unique<DB::ReadBufferFromAzureBlobStorage>(getClient(), file_uri.getPath(), DB::ReadSettings(), 5, 5);
}
private:
std::shared_ptr<DB::AzureBlobStorage::ContainerClient> shared_client;
std::shared_ptr<DB::AzureBlobStorage::ContainerClient> getClient()
{
if (shared_client)
return shared_client;
const std::string config_prefix = "blob";
const Poco::Util::AbstractConfiguration & config = context->getConfigRef();
bool is_client_for_disk = false;
auto new_settings = DB::AzureBlobStorage::getRequestSettings(config, config_prefix, context);
DB::AzureBlobStorage::ConnectionParams params{
.endpoint = DB::AzureBlobStorage::processEndpoint(config, config_prefix),
.auth_method = DB::AzureBlobStorage::getAuthMethod(config, config_prefix),
.client_options = DB::AzureBlobStorage::getClientOptions(*new_settings, is_client_for_disk),
};
shared_client = DB::AzureBlobStorage::getContainerClient(params, true);
return shared_client;
}
};
#endif
void registerReadBufferBuilders()
{
auto & factory = ReadBufferBuilderFactory::instance();
factory.registerBuilder("file", [](DB::ContextPtr context_) { return std::make_shared<LocalFileReadBufferBuilder>(context_); });
#if USE_HDFS
factory.registerBuilder("hdfs", [](DB::ContextPtr context_) { return std::make_shared<HDFSFileReadBufferBuilder>(context_); });
#endif
#if USE_AWS_S3
factory.registerBuilder("s3", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); });
factory.registerBuilder("s3a", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); });
factory.registerCleaner([]() { S3FileReadBufferBuilder::per_bucket_clients.clear(); });
#endif
#if USE_AZURE_BLOB_STORAGE
factory.registerBuilder("wasb", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); });
factory.registerBuilder("wasbs", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); });
#endif
}
DB::ReadSettings ReadBufferBuilder::getReadSettings() const
{
DB::ReadSettings read_settings = context->getReadSettings();
const auto & config = context->getConfigRef();
/// Override enable_filesystem_cache with gluten config
read_settings.enable_filesystem_cache = config.getBool(GlutenCacheConfig::ENABLED, false);
/// Override remote_fs_prefetch with gluten config
read_settings.remote_fs_prefetch = config.getBool("hdfs.enable_async_io", false);
return read_settings;
}
ReadBufferBuilder::ReadBufferBuilder(const DB::ContextPtr & context_) : context(context_)
{
}
std::unique_ptr<DB::ReadBuffer>
ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) const
{
/// Bzip2 compressed file is splittable and we need to adjust read range for each split
auto * seekable = dynamic_cast<SeekableReadBuffer *>(in.release());
if (!seekable)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "ReadBuffer underlying BZIP2 decompressor must be seekable");
std::unique_ptr<SeekableReadBuffer> seekable_in(seekable);
size_t file_size = getFileSizeFromReadBuffer(*seekable_in);
size_t start = file_info.start();
size_t end = file_info.start() + file_info.length();
/// No need to adjust start becuase it is already processed inside SplittableBzip2ReadBuffer
size_t new_start = start;
/// Extend end to the end of next block.
size_t new_end = end;
if (end < file_size)
{
Int64 bs_buff = 0;
Int64 bs_live = 0;
/// From end position skip to the second block delimiter.
seekable_in->seek(end, SEEK_SET);
for (size_t i = 0; i < 2; ++i)
{
size_t pos = seekable_in->getPosition();
bool ok = SplittableBzip2ReadBuffer::skipToNextMarker(
SplittableBzip2ReadBuffer::BLOCK_DELIMITER,
SplittableBzip2ReadBuffer::DELIMITER_BIT_LENGTH,
*seekable_in,
bs_buff,
bs_live);
if (seekable_in->eof())
break;
if (!ok)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find next block delimiter in after offset: {}", pos);
}
new_end = seekable->eof() ? file_size : seekable_in->getPosition() - SplittableBzip2ReadBuffer::DELIMITER_BIT_LENGTH / 8 + 1;
}
LOG_DEBUG(
&Poco::Logger::get("ReadBufferBuilder"),
"File read start and end position adjusted from {},{} to {},{}",
start,
end,
new_start,
new_end);
std::unique_ptr<SeekableReadBuffer> bounded_in;
#if USE_HDFS
if (dynamic_cast<DB::ReadBufferFromHDFS *>(seekable_in.get()) || dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(seekable_in.get())
|| dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
bounded_in = std::make_unique<BoundedReadBuffer>(std::move(seekable_in));
else
bounded_in = std::move(seekable_in);
#else
if (dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
bounded_in = std::make_unique<BoundedReadBuffer>(std::move(seekable_in));
else
bounded_in = std::move(seekable_in);
#endif
bounded_in->seek(new_start, SEEK_SET);
bounded_in->setReadUntilPosition(new_end);
bool first_block_need_special_process = (new_start > 0);
bool last_block_need_special_process = (new_end < file_size);
size_t buffer_size = context->getSettingsRef()[DB::Setting::max_read_buffer_size];
auto decompressed_in = std::make_unique<SplittableBzip2ReadBuffer>(
std::move(bounded_in), first_block_need_special_process, last_block_need_special_process, buffer_size);
return std::move(decompressed_in);
}
std::unique_ptr<DB::ReadBuffer>
ReadBufferBuilder::buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
{
auto in = build(file_info);
/// Wrap the read buffer with compression method if exists
Poco::URI file_uri(file_info.uri_file());
DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto");
if (compression == CompressionMethod::Bzip2)
return wrapWithBzip2(std::move(in), file_info);
else
{
/// In this case we are pretty sure that current split covers the whole file because only bzip2 compression is splittable
auto parallel = wrapWithParallelIfNeeded(std::move(in), file_info);
return wrapReadBufferWithCompressionMethod(std::move(parallel), compression);
}
}
ReadBufferBuilder::ReadBufferCreator ReadBufferBuilder::wrapWithCache(
ReadBufferCreator read_buffer_creator,
DB::ReadSettings & read_settings,
const String & key,
size_t last_modified_time,
size_t file_size)
{
const auto & config = context->getConfigRef();
if (!config.getBool(GlutenCacheConfig::ENABLED, false))
return read_buffer_creator;
read_settings.enable_filesystem_cache = true;
if (!file_cache)
{
DB::FileCacheSettings file_cache_settings;
file_cache_settings.loadFromConfig(config, GlutenCacheConfig::PREFIX);
auto & base_path = file_cache_settings[FileCacheSetting::path].value;
if (std::filesystem::path(base_path).is_relative())
base_path = std::filesystem::path(context->getPath()) / "caches" / base_path;
if (!std::filesystem::exists(base_path))
std::filesystem::create_directories(base_path);
const auto name = config.getString(GlutenCacheConfig::PREFIX + ".name");
std::string config_prefix;
file_cache = DB::FileCacheFactory::instance().getOrCreate(name, file_cache_settings, config_prefix);
file_cache->initialize();
}
if (!file_cache->isInitialized())
{
file_cache->throwInitExceptionIfNeeded();
return read_buffer_creator;
}
updateCaches(key, last_modified_time, file_size);
return [read_buffer_creator, read_settings, this](
bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
{
auto cache_key = DB::FileCacheKey::fromPath(object.remote_path);
auto modified_read_settings = read_settings.withNestedBuffer();
auto rbc = [=, this]() { return read_buffer_creator(restricted_seek, object); };
return std::make_unique<DB::CachedOnDiskReadBufferFromFile>(
object.remote_path,
cache_key,
file_cache,
DB::FileCache::getCommonUser(),
rbc,
modified_read_settings,
std::string(DB::CurrentThread::getQueryId()),
object.bytes_size,
/* allow_seeks */ !read_settings.remote_read_buffer_restrict_seek,
/* use_external_buffer */ true,
/* read_until_position */ std::nullopt,
context->getFilesystemCacheLog());
};
}
void ReadBufferBuilder::updateCaches(const String & key, size_t last_modified_time, size_t file_size) const
{
if (!file_cache)
return;
auto file_cache_key = DB::FileCacheKey::fromPath(key);
auto last_cache_time = files_cache_time_map.get(file_cache_key);
// quick check
if (last_cache_time != std::nullopt && last_cache_time.has_value())
{
auto & [cached_modified_time, cached_file_size] = last_cache_time.value();
if (cached_modified_time < last_modified_time || cached_file_size != file_size)
files_cache_time_map.update_cache_time(file_cache_key, last_modified_time, file_size, file_cache);
}
else
{
// if process restart, cache map will be empty,
// we recommend continuing to use caching instead of renew it
files_cache_time_map.insert(file_cache_key, last_modified_time, file_size);
}
}
std::unique_ptr<DB::ReadBuffer> ReadBufferBuilder::wrapWithParallelIfNeeded(
std::unique_ptr<DB::ReadBuffer> in, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
{
/// Only use parallel downloading for text and json format because data are read serially in those formats.
if (!file_info.has_text() && !file_info.has_json())
return std::move(in);
const auto & settings = context->getSettingsRef();
auto max_download_threads = settings[DB::Setting::max_download_threads];
auto max_download_buffer_size = settings[DB::Setting::max_download_buffer_size];
bool parallel_read = isRemote() && max_download_threads > 1 && isBufferWithFileSize(*in);
if (!parallel_read)
return std::move(in);
size_t file_size = getFileSizeFromReadBuffer(*in);
if (file_size < 4 * max_download_buffer_size)
return std::move(in);
LOG_TRACE(
getLogger("ReadBufferBuilder"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
max_download_threads.value,
max_download_buffer_size.value);
return wrapInParallelReadBufferIfSupported(
{std::move(in)},
DB::threadPoolCallbackRunnerUnsafe<void>(DB::getIOThreadPool().get(), "ParallelRead"),
max_download_threads,
max_download_buffer_size,
file_size);
}
ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
{
static ReadBufferBuilderFactory instance;
return instance;
}
void ReadBufferBuilderFactory::registerBuilder(const String & schema, NewBuilder newer)
{
auto it = builders.find(schema);
if (it != builders.end())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "readbuffer builder for {} has been registered", schema);
builders[schema] = newer;
}
ReadBufferBuilderPtr ReadBufferBuilderFactory::createBuilder(const String & schema, DB::ContextPtr context)
{
auto it = builders.find(schema);
if (it == builders.end())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Not found read buffer builder for {}", schema);
return it->second(context);
}
void ReadBufferBuilderFactory::registerCleaner(Cleaner cleaner)
{
cleaners.push_back(cleaner);
}
void ReadBufferBuilderFactory::clean()
{
for (auto c : cleaners)
{
c();
}
}
}