cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp (772 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include <memory> #include <shared_mutex> #include <Core/Settings.h> #include <Disks/IO/AsynchronousBoundedReadBuffer.h> #include <Disks/IO/CachedOnDiskReadBufferFromFile.h> #include <Disks/IO/ReadBufferFromAzureBlobStorage.h> #include <Disks/IO/ReadBufferFromRemoteFSGather.h> #include <IO/BoundedReadBuffer.h> #include <IO/ParallelReadBuffer.h> #include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromS3.h> #include <IO/ReadSettings.h> #include <IO/S3/getObjectInfo.h> #include <IO/S3Common.h> #include <IO/SeekableReadBuffer.h> #include <IO/SharedThreadPools.h> #include <IO/SplittableBzip2ReadBuffer.h> #include <Interpreters/Cache/FileCache.h> #include <Interpreters/Cache/FileCacheFactory.h> #include <Interpreters/Cache/FileCacheSettings.h> #include <Interpreters/Context.h> #include <Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h> #include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h> #include <Storages/SubstraitSource/ReadBufferBuilder.h> #include <Storages/SubstraitSource/SubstraitFileSource.h> #include <boost/algorithm/string/case_conv.hpp> #include <boost/compute/detail/lru_cache.hpp> #include <sys/stat.h> #include <Poco/Logger.h> #include <Poco/URI.h> #include <Common/CHUtil.h> #include <Common/GlutenConfig.h> #include <Common/GlutenSettings.h> #include <Common/logger_useful.h> #include <Common/safe_cast.h> #if USE_AZURE_BLOB_STORAGE #include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h> #endif #if USE_AWS_S3 #include <aws/core/client/DefaultRetryStrategy.h> #include <aws/s3/model/CopyObjectRequest.h> #include <aws/s3/model/DeleteObjectsRequest.h> #include <aws/s3/model/ListObjectsV2Request.h> #endif #if USE_HDFS #include <hdfs/hdfs.h> #endif namespace DB { namespace Setting { extern const SettingsUInt64 s3_max_redirects; extern const SettingsBool s3_disable_checksum; extern const SettingsUInt64 s3_retry_attempts; extern const SettingsMaxThreads max_download_threads; extern const SettingsUInt64 max_download_buffer_size; extern const SettingsBool input_format_allow_seeks; extern const SettingsUInt64 max_read_buffer_size; } namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int CANNOT_OPEN_FILE; extern const int UNKNOWN_FILE_SIZE; extern const int CANNOT_SEEK_THROUGH_FILE; extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; } namespace FileCacheSetting { extern const FileCacheSettingsUInt64 max_size; extern const FileCacheSettingsString path; } } namespace local_engine { using namespace DB; FileCacheConcurrentMap ReadBufferBuilder::files_cache_time_map; template <class key_type, class value_type> class ConcurrentLRU { public: explicit ConcurrentLRU(size_t size) : cache(size) { } boost::optional<value_type> get(const key_type & key) { std::shared_lock<std::shared_mutex> lock(rwLock); return cache.get(key); } void insert(const key_type & key, const value_type & value) { std::unique_lock<std::shared_mutex> lock(rwLock); cache.insert(key, value); } void clear() { std::unique_lock<std::shared_mutex> lock(rwLock); cache.clear(); } private: boost::compute::detail::lru_cache<key_type, value_type> cache; std::shared_mutex rwLock; }; static std::pair<size_t, size_t> getAdjustedReadRange(SeekableReadBuffer & buffer, const std::pair<size_t, size_t> & start_end) { auto get_next_line_pos = [&](SeekableReadBuffer & buf) -> size_t { while (!buf.eof()) { /// Search for \n or \r\n or \n\r or \r in buffer. if (*buf.position() == '\r') { ++buf.position(); if (!buf.eof() && *buf.position() == '\n') { ++buf.position(); } return buf.getPosition(); } else if (*buf.position() == '\n') { ++buf.position(); if (!buf.eof() && *buf.position() == '\r') { ++buf.position(); } return buf.getPosition(); } ++buf.position(); } return buf.getPosition(); }; size_t read_start_pos = start_end.first; size_t read_end_pos = start_end.second; std::pair<size_t, size_t> result; if (read_start_pos == 0) result.first = read_start_pos; else { buffer.seek(read_start_pos, SEEK_SET); result.first = get_next_line_pos(buffer); } if (read_end_pos == 0) result.second = read_end_pos; else { buffer.seek(read_end_pos, SEEK_SET); result.second = get_next_line_pos(buffer); } return result; } static std::unique_ptr<SeekableReadBuffer> adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer> read_buffer, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) { /// Skip formats in which rows are not seperated by newline characters. if (!(file_info.has_text() || file_info.has_json())) return std::move(read_buffer); /// Skip text/json files with compression. /// When the file is compressed, its read range is adjusted in [[buildWithCompressionWrapper]] Poco::URI file_uri(file_info.uri_file()); DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto"); if (compression != CompressionMethod::None) return std::move(read_buffer); std::pair<size_t, size_t> start_end{file_info.start(), file_info.start() + file_info.length()}; start_end = getAdjustedReadRange(*read_buffer, start_end); LOG_DEBUG( &Poco::Logger::get("ReadBufferBuilder"), "File read start and end position adjusted from {},{} to {},{}", file_info.start(), file_info.start() + file_info.length(), start_end.first, start_end.second); #if USE_HDFS /// If read buffer doesn't support right bounded reads, wrap it with BoundedReadBuffer to enable right bounded reads. if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) || dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(read_buffer.get()) || dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get())) read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer)); #else if (dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get())) read_buffer = std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer)); #endif read_buffer->seek(start_end.first, SEEK_SET); read_buffer->setReadUntilPosition(start_end.second); return std::move(read_buffer); } class LocalFileReadBufferBuilder : public ReadBufferBuilder { public: explicit LocalFileReadBufferBuilder(const DB::ContextPtr & context_) : ReadBufferBuilder(context_) { } ~LocalFileReadBufferBuilder() override = default; bool isRemote() const override { return false; } std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override { Poco::URI file_uri(file_info.uri_file()); std::unique_ptr<DB::ReadBufferFromFileBase> read_buffer; const String & file_path = file_uri.getPath(); struct stat file_stat; if (stat(file_path.c_str(), &file_stat)) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "file stat failed for {}", file_path); if (S_ISREG(file_stat.st_mode)) read_buffer = std::make_unique<DB::ReadBufferFromFilePRead>(file_path); else read_buffer = std::make_unique<DB::ReadBufferFromFile>(file_path); return adjustReadRangeIfNeeded(std::move(read_buffer), file_info); } }; #if USE_HDFS class HDFSFileReadBufferBuilder : public ReadBufferBuilder { using ReadBufferCreator = std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek, const DB::StoredObject & object)>; public: explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_), context(context_) { } ~HDFSFileReadBufferBuilder() override = default; std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override { DB::ReadSettings read_settings = getReadSettings(); const auto & config = context->getConfigRef(); /// Get hdfs_uri Poco::URI uri(file_info.uri_file()); auto hdfs_file_path = uri.getPath(); std::string new_file_uri = uri.toString(); if (uri.getUserInfo().empty() && BackendInitializerUtil::spark_user.has_value()) { uri.setUserInfo(*BackendInitializerUtil::spark_user); new_file_uri = uri.toString(); } auto begin_of_path = new_file_uri.find('/', new_file_uri.find("//") + 2); auto hdfs_uri = new_file_uri.substr(0, begin_of_path); std::optional<size_t> file_size; std::optional<size_t> modified_time; if (file_info.has_properties()) { if (file_info.properties().filesize() > 0) { /// filesize may be zero, under such condition we should not set file_size file_size = file_info.properties().filesize(); } modified_time = file_info.properties().modificationtime(); } std::unique_ptr<SeekableReadBuffer> read_buffer; if (!read_settings.enable_filesystem_cache) { bool thread_pool_read = read_settings.remote_fs_method == DB::RemoteFSReadMethod::threadpool; /// ORC and Parquet reader had already implemented async prefetch. They don't rely on AsynchronousReadBufferFromHDFS bool use_async_prefetch = read_settings.remote_fs_prefetch && thread_pool_read && (file_info.has_text() || file_info.has_json()); auto raw_read_buffer = std::make_unique<ReadBufferFromHDFS>( hdfs_uri, hdfs_file_path, config, read_settings, /* read_until_position */ 0, /* use_external_buffer */ false, file_size); if (use_async_prefetch) read_buffer = std::make_unique<AsynchronousReadBufferFromHDFS>( getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, std::move(raw_read_buffer)); else read_buffer = std::move(raw_read_buffer); } else { if (!file_size.has_value()) { // only for spark3.2 file partition not contained file size // so first compute file size first auto tmp_read_buffer = std::make_unique<DB::ReadBufferFromHDFS>( hdfs_uri, hdfs_file_path, config, read_settings, /* read_until_position */ 0); file_size = tmp_read_buffer->getFileSize(); } if (!modified_time.has_value()) modified_time = 0; ReadBufferCreator read_buffer_creator = [this, hdfs_uri = hdfs_uri, hdfs_file_path = hdfs_file_path, read_settings, &config]( bool /* restricted_seek */, const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromHDFS> { return std::make_unique<DB::ReadBufferFromHDFS>( hdfs_uri, hdfs_file_path, config, read_settings, 0, true, object.bytes_size); }; auto remote_path = uri.getPath().substr(1); DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "", *file_size}}; auto cache_creator = wrapWithCache(read_buffer_creator, read_settings, remote_path, *modified_time, *file_size); size_t buffer_size = std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE); if (*file_size > 0) buffer_size = std::min(*file_size, buffer_size); auto cache_hdfs_read = std::make_unique<DB::ReadBufferFromRemoteFSGather>( std::move(cache_creator), stored_objects, read_settings, /* use_external_buffer */ false, buffer_size); read_buffer = std::move(cache_hdfs_read); } return adjustReadRangeIfNeeded(std::move(read_buffer), file_info); } private: DB::ContextPtr context; }; #endif #if USE_AWS_S3 class S3FileReadBufferBuilder : public ReadBufferBuilder { friend void registerReadBufferBuilders(); public: explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) { auto config = S3Config::loadFromContext(context); // use gluten cache config is first priority if (!file_cache && config.s3_local_cache_enabled) { DB::FileCacheSettings file_cache_settings; file_cache_settings[FileCacheSetting::max_size] = config.s3_local_cache_max_size; auto cache_base_path = config.s3_local_cache_cache_path; if (!std::filesystem::exists(cache_base_path)) std::filesystem::create_directories(cache_base_path); file_cache_settings[FileCacheSetting::path] = cache_base_path; file_cache = DB::FileCacheFactory::instance().getOrCreate("s3_local_cache", file_cache_settings, ""); file_cache->initialize(); } } ~S3FileReadBufferBuilder() override = default; std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override { DB::ReadSettings read_settings = getReadSettings(); Poco::URI file_uri(file_info.uri_file()); // file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet const std::string & bucket = file_uri.getHost(); const auto client = getClient(bucket); std::string pathKey = file_uri.getPath().substr(1); size_t object_size = 0; size_t object_modified_time = 0; if (file_info.has_properties()) { object_size = file_info.properties().filesize(); object_modified_time = file_info.properties().modificationtime(); } else { DB::S3::ObjectInfo object_info = DB::S3::getObjectInfo(*client, bucket, pathKey, ""); object_size = object_info.size; object_modified_time = object_info.last_modification_time; } auto read_buffer_creator = [bucket, client, read_settings, this]( bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase> { return std::make_unique<DB::ReadBufferFromS3>( client, bucket, object.remote_path, "", DB::S3::S3RequestSettings(), read_settings, /* use_external_buffer */ true, /* offset */ 0, /* read_until_position */ 0, restricted_seek); }; auto cache_creator = wrapWithCache(read_buffer_creator, read_settings, pathKey, object_modified_time, object_size); DB::StoredObjects stored_objects{DB::StoredObject{pathKey, "", object_size}}; auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>( std::move(cache_creator), stored_objects, read_settings, /* use_external_buffer */ true, 0); auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); size_t buffer_size = std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE); if (object_size > 0) buffer_size = std::min(object_size, buffer_size); auto async_reader = std::make_unique<DB::AsynchronousBoundedReadBuffer>( std::move(s3_impl), pool_reader, read_settings, buffer_size, read_settings.remote_read_min_bytes_for_seek); if (read_settings.remote_fs_prefetch) async_reader->prefetch(Priority{}); return adjustReadRangeIfNeeded(std::move(async_reader), file_info); } private: static const std::string SHARED_CLIENT_KEY; static ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> per_bucket_clients; static std::string toBucketNameSetting(const std::string & bucket_name, const std::string & config_name) { if (!config_name.starts_with(BackendInitializerUtil::S3A_PREFIX)) { // Currently per bucket only support fs.s3a.xxx return config_name; } // like: fs.s3a.bucket.bucket_name.assumed.role.externalId return BackendInitializerUtil::S3A_PREFIX + "bucket." + bucket_name + "." + config_name.substr(BackendInitializerUtil::S3A_PREFIX.size()); } static std::string getSetting( const DB::Settings & settings, const std::string & bucket_name, const std::string & config_name, const std::string & default_value = "", const bool require_per_bucket = false) { std::string ret; // if there's a bucket specific config, prefer it to non per bucket config if (tryGetString(settings, toBucketNameSetting(bucket_name, config_name), ret)) return ret; if (!require_per_bucket && tryGetString(settings, config_name, ret)) return ret; return default_value; } void cacheClient(const std::string & bucket_name, const bool is_per_bucket, std::shared_ptr<DB::S3::Client> client) { if (is_per_bucket) { per_bucket_clients.insert(bucket_name, client); } else { per_bucket_clients.insert(SHARED_CLIENT_KEY, client); } } std::shared_ptr<DB::S3::Client> getClient(const std::string & bucket_name) { const auto & config = context->getConfigRef(); const auto & settings = context->getSettingsRef(); bool use_assumed_role = false; bool is_per_bucket = false; if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE).empty()) use_assumed_role = true; if (!getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE, "", true).empty()) is_per_bucket = true; if (is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE)) { auto client = per_bucket_clients.get(bucket_name); if (client.has_value()) { return client.get(); } } if (!is_per_bucket && "true" != getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_CLIENT_CACHE_IGNORE)) { auto client = per_bucket_clients.get(SHARED_CLIENT_KEY); if (client.has_value()) return client.get(); } String config_prefix = "s3"; auto endpoint = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ENDPOINT, "https://s3.us-west-2.amazonaws.com"); bool end_point_start_with_http_or_https = endpoint.starts_with("https://") || endpoint.starts_with("http://"); if (!end_point_start_with_http_or_https) { if (endpoint.starts_with("s3")) // as https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/bk_cloud-data-access/content/s3-config-parameters.html // the fs.s3a.endpoint does not contain https:// prefix endpoint = "https://" + endpoint; else throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Endpoint format not right: {}", endpoint); } String region_name; const char * amazon_suffix = ".amazonaws.com"; const char * amazon_prefix = "https://s3."; auto pos = endpoint.find(amazon_suffix); if (pos != std::string::npos) { assert(endpoint.starts_with(amazon_prefix)); region_name = endpoint.substr(strlen(amazon_prefix), pos - strlen(amazon_prefix)); assert(region_name.find('.') == std::string::npos); } // for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration( region_name, context->getRemoteHostFilter(), static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_max_redirects]), static_cast<unsigned>(context->getSettingsRef()[DB::Setting::s3_retry_attempts]), false, false, nullptr, nullptr); client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000); client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000); client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100); client_configuration.endpointOverride = endpoint; client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10)); std::string ak; std::string sk; std::string path_style_access; bool addressing_type = false; tryGetString(settings, BackendInitializerUtil::HADOOP_S3_ACCESS_KEY, ak); tryGetString(settings, BackendInitializerUtil::HADOOP_S3_SECRET_KEY, sk); tryGetString(settings, BackendInitializerUtil::HADOOP_S3_PATH_STYLE_ACCESS, path_style_access); const DB::Settings & global_settings = context->getGlobalContext()->getSettingsRef(); const DB::Settings & local_settings = context->getSettingsRef(); boost::algorithm::to_lower(path_style_access); addressing_type = (path_style_access == "false"); DB::S3::ClientSettings client_settings{ .use_virtual_addressing = addressing_type, .disable_checksum = local_settings[DB::Setting::s3_disable_checksum], .gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false), }; if (use_assumed_role) { auto new_client = DB::S3::ClientFactory::instance().create( client_configuration, client_settings, ak, // access_key_id sk, // secret_access_key "", // server_side_encryption_customer_key_base64 {}, // sse_kms_config {}, // headers DB::S3::CredentialsConfiguration{ .use_environment_credentials = true, .use_insecure_imds_request = false, .role_arn = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE), .session_name = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME), .external_id = getSetting(settings, bucket_name, BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)}); //TODO: support online change config for cached per_bucket_clients std::shared_ptr<DB::S3::Client> ret = std::move(new_client); cacheClient(bucket_name, is_per_bucket, ret); return ret; } else { auto new_client = DB::S3::ClientFactory::instance().create( client_configuration, client_settings, ak, // access_key_id sk, // secret_access_key "", // server_side_encryption_customer_key_base64 {}, // sse_kms_config {}, // headers DB::S3::CredentialsConfiguration{.use_environment_credentials = true, .use_insecure_imds_request = false}); std::shared_ptr<DB::S3::Client> ret = std::move(new_client); cacheClient(bucket_name, is_per_bucket, ret); return ret; } } }; const std::string S3FileReadBufferBuilder::SHARED_CLIENT_KEY = "___shared-client___"; ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> S3FileReadBufferBuilder::per_bucket_clients(100); #endif #if USE_AZURE_BLOB_STORAGE class AzureBlobReadBuffer : public ReadBufferBuilder { public: explicit AzureBlobReadBuffer(const DB::ContextPtr & context_) : ReadBufferBuilder(context_) { } ~AzureBlobReadBuffer() override = default; std::unique_ptr<DB::ReadBuffer> build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override { Poco::URI file_uri(file_info.uri_file()); return std::make_unique<DB::ReadBufferFromAzureBlobStorage>(getClient(), file_uri.getPath(), DB::ReadSettings(), 5, 5); } private: std::shared_ptr<DB::AzureBlobStorage::ContainerClient> shared_client; std::shared_ptr<DB::AzureBlobStorage::ContainerClient> getClient() { if (shared_client) return shared_client; const std::string config_prefix = "blob"; const Poco::Util::AbstractConfiguration & config = context->getConfigRef(); bool is_client_for_disk = false; auto new_settings = DB::AzureBlobStorage::getRequestSettings(config, config_prefix, context); DB::AzureBlobStorage::ConnectionParams params{ .endpoint = DB::AzureBlobStorage::processEndpoint(config, config_prefix), .auth_method = DB::AzureBlobStorage::getAuthMethod(config, config_prefix), .client_options = DB::AzureBlobStorage::getClientOptions(*new_settings, is_client_for_disk), }; shared_client = DB::AzureBlobStorage::getContainerClient(params, true); return shared_client; } }; #endif void registerReadBufferBuilders() { auto & factory = ReadBufferBuilderFactory::instance(); factory.registerBuilder("file", [](DB::ContextPtr context_) { return std::make_shared<LocalFileReadBufferBuilder>(context_); }); #if USE_HDFS factory.registerBuilder("hdfs", [](DB::ContextPtr context_) { return std::make_shared<HDFSFileReadBufferBuilder>(context_); }); #endif #if USE_AWS_S3 factory.registerBuilder("s3", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); }); factory.registerBuilder("s3a", [](DB::ContextPtr context_) { return std::make_shared<S3FileReadBufferBuilder>(context_); }); factory.registerCleaner([]() { S3FileReadBufferBuilder::per_bucket_clients.clear(); }); #endif #if USE_AZURE_BLOB_STORAGE factory.registerBuilder("wasb", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); }); factory.registerBuilder("wasbs", [](DB::ContextPtr context_) { return std::make_shared<AzureBlobReadBuffer>(context_); }); #endif } DB::ReadSettings ReadBufferBuilder::getReadSettings() const { DB::ReadSettings read_settings = context->getReadSettings(); const auto & config = context->getConfigRef(); /// Override enable_filesystem_cache with gluten config read_settings.enable_filesystem_cache = config.getBool(GlutenCacheConfig::ENABLED, false); /// Override remote_fs_prefetch with gluten config read_settings.remote_fs_prefetch = config.getBool("hdfs.enable_async_io", false); return read_settings; } ReadBufferBuilder::ReadBufferBuilder(const DB::ContextPtr & context_) : context(context_) { } std::unique_ptr<DB::ReadBuffer> ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) const { /// Bzip2 compressed file is splittable and we need to adjust read range for each split auto * seekable = dynamic_cast<SeekableReadBuffer *>(in.release()); if (!seekable) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "ReadBuffer underlying BZIP2 decompressor must be seekable"); std::unique_ptr<SeekableReadBuffer> seekable_in(seekable); size_t file_size = getFileSizeFromReadBuffer(*seekable_in); size_t start = file_info.start(); size_t end = file_info.start() + file_info.length(); /// No need to adjust start becuase it is already processed inside SplittableBzip2ReadBuffer size_t new_start = start; /// Extend end to the end of next block. size_t new_end = end; if (end < file_size) { Int64 bs_buff = 0; Int64 bs_live = 0; /// From end position skip to the second block delimiter. seekable_in->seek(end, SEEK_SET); for (size_t i = 0; i < 2; ++i) { size_t pos = seekable_in->getPosition(); bool ok = SplittableBzip2ReadBuffer::skipToNextMarker( SplittableBzip2ReadBuffer::BLOCK_DELIMITER, SplittableBzip2ReadBuffer::DELIMITER_BIT_LENGTH, *seekable_in, bs_buff, bs_live); if (seekable_in->eof()) break; if (!ok) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find next block delimiter in after offset: {}", pos); } new_end = seekable->eof() ? file_size : seekable_in->getPosition() - SplittableBzip2ReadBuffer::DELIMITER_BIT_LENGTH / 8 + 1; } LOG_DEBUG( &Poco::Logger::get("ReadBufferBuilder"), "File read start and end position adjusted from {},{} to {},{}", start, end, new_start, new_end); std::unique_ptr<SeekableReadBuffer> bounded_in; #if USE_HDFS if (dynamic_cast<DB::ReadBufferFromHDFS *>(seekable_in.get()) || dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(seekable_in.get()) || dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get())) bounded_in = std::make_unique<BoundedReadBuffer>(std::move(seekable_in)); else bounded_in = std::move(seekable_in); #else if (dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get())) bounded_in = std::make_unique<BoundedReadBuffer>(std::move(seekable_in)); else bounded_in = std::move(seekable_in); #endif bounded_in->seek(new_start, SEEK_SET); bounded_in->setReadUntilPosition(new_end); bool first_block_need_special_process = (new_start > 0); bool last_block_need_special_process = (new_end < file_size); size_t buffer_size = context->getSettingsRef()[DB::Setting::max_read_buffer_size]; auto decompressed_in = std::make_unique<SplittableBzip2ReadBuffer>( std::move(bounded_in), first_block_need_special_process, last_block_need_special_process, buffer_size); return std::move(decompressed_in); } std::unique_ptr<DB::ReadBuffer> ReadBufferBuilder::buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) { auto in = build(file_info); /// Wrap the read buffer with compression method if exists Poco::URI file_uri(file_info.uri_file()); DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto"); if (compression == CompressionMethod::Bzip2) return wrapWithBzip2(std::move(in), file_info); else { /// In this case we are pretty sure that current split covers the whole file because only bzip2 compression is splittable auto parallel = wrapWithParallelIfNeeded(std::move(in), file_info); return wrapReadBufferWithCompressionMethod(std::move(parallel), compression); } } ReadBufferBuilder::ReadBufferCreator ReadBufferBuilder::wrapWithCache( ReadBufferCreator read_buffer_creator, DB::ReadSettings & read_settings, const String & key, size_t last_modified_time, size_t file_size) { const auto & config = context->getConfigRef(); if (!config.getBool(GlutenCacheConfig::ENABLED, false)) return read_buffer_creator; read_settings.enable_filesystem_cache = true; if (!file_cache) { DB::FileCacheSettings file_cache_settings; file_cache_settings.loadFromConfig(config, GlutenCacheConfig::PREFIX); auto & base_path = file_cache_settings[FileCacheSetting::path].value; if (std::filesystem::path(base_path).is_relative()) base_path = std::filesystem::path(context->getPath()) / "caches" / base_path; if (!std::filesystem::exists(base_path)) std::filesystem::create_directories(base_path); const auto name = config.getString(GlutenCacheConfig::PREFIX + ".name"); std::string config_prefix; file_cache = DB::FileCacheFactory::instance().getOrCreate(name, file_cache_settings, config_prefix); file_cache->initialize(); } if (!file_cache->isInitialized()) { file_cache->throwInitExceptionIfNeeded(); return read_buffer_creator; } updateCaches(key, last_modified_time, file_size); return [read_buffer_creator, read_settings, this]( bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase> { auto cache_key = DB::FileCacheKey::fromPath(object.remote_path); auto modified_read_settings = read_settings.withNestedBuffer(); auto rbc = [=, this]() { return read_buffer_creator(restricted_seek, object); }; return std::make_unique<DB::CachedOnDiskReadBufferFromFile>( object.remote_path, cache_key, file_cache, DB::FileCache::getCommonUser(), rbc, modified_read_settings, std::string(DB::CurrentThread::getQueryId()), object.bytes_size, /* allow_seeks */ !read_settings.remote_read_buffer_restrict_seek, /* use_external_buffer */ true, /* read_until_position */ std::nullopt, context->getFilesystemCacheLog()); }; } void ReadBufferBuilder::updateCaches(const String & key, size_t last_modified_time, size_t file_size) const { if (!file_cache) return; auto file_cache_key = DB::FileCacheKey::fromPath(key); auto last_cache_time = files_cache_time_map.get(file_cache_key); // quick check if (last_cache_time != std::nullopt && last_cache_time.has_value()) { auto & [cached_modified_time, cached_file_size] = last_cache_time.value(); if (cached_modified_time < last_modified_time || cached_file_size != file_size) files_cache_time_map.update_cache_time(file_cache_key, last_modified_time, file_size, file_cache); } else { // if process restart, cache map will be empty, // we recommend continuing to use caching instead of renew it files_cache_time_map.insert(file_cache_key, last_modified_time, file_size); } } std::unique_ptr<DB::ReadBuffer> ReadBufferBuilder::wrapWithParallelIfNeeded( std::unique_ptr<DB::ReadBuffer> in, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) { /// Only use parallel downloading for text and json format because data are read serially in those formats. if (!file_info.has_text() && !file_info.has_json()) return std::move(in); const auto & settings = context->getSettingsRef(); auto max_download_threads = settings[DB::Setting::max_download_threads]; auto max_download_buffer_size = settings[DB::Setting::max_download_buffer_size]; bool parallel_read = isRemote() && max_download_threads > 1 && isBufferWithFileSize(*in); if (!parallel_read) return std::move(in); size_t file_size = getFileSizeFromReadBuffer(*in); if (file_size < 4 * max_download_buffer_size) return std::move(in); LOG_TRACE( getLogger("ReadBufferBuilder"), "Using ParallelReadBuffer with {} workers with chunks of {} bytes", max_download_threads.value, max_download_buffer_size.value); return wrapInParallelReadBufferIfSupported( {std::move(in)}, DB::threadPoolCallbackRunnerUnsafe<void>(DB::getIOThreadPool().get(), "ParallelRead"), max_download_threads, max_download_buffer_size, file_size); } ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance() { static ReadBufferBuilderFactory instance; return instance; } void ReadBufferBuilderFactory::registerBuilder(const String & schema, NewBuilder newer) { auto it = builders.find(schema); if (it != builders.end()) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "readbuffer builder for {} has been registered", schema); builders[schema] = newer; } ReadBufferBuilderPtr ReadBufferBuilderFactory::createBuilder(const String & schema, DB::ContextPtr context) { auto it = builders.find(schema); if (it == builders.end()) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Not found read buffer builder for {}", schema); return it->second(context); } void ReadBufferBuilderFactory::registerCleaner(Cleaner cleaner) { cleaners.push_back(cleaner); } void ReadBufferBuilderFactory::clean() { for (auto c : cleaners) { c(); } } }