cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp (86 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include <Core/Settings.h>
#include <Disks/ObjectStorages/ObjectStorageFactory.h>
#include <Interpreters/Context.h>
#include <Common/GlutenConfig.h>
#include <Common/Macros.h>
#if USE_AWS_S3
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#endif
#if USE_HDFS
#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
#endif
namespace DB
{
namespace Setting
{
extern const SettingsUInt64 hdfs_replication;
}
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
}
namespace local_engine
{
using namespace DB;
#if USE_AWS_S3
static S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const ContextPtr & context)
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
/// An empty key remains empty.
if (!uri.key.empty() && !uri.key.ends_with('/'))
uri.key.push_back('/');
return uri;
}
static std::string
getEndpoint(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const ContextPtr & context)
{
return context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
}
void registerGlutenS3ObjectStorage(ObjectStorageFactory & factory)
{
factory.registerObjectStorageType(
GlutenObjectStorageConfig::S3_DISK_TYPE,
[](const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool /*skip_access_check*/) -> ObjectStoragePtr
{
auto uri = getS3URI(config, config_prefix, context);
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto endpoint = getEndpoint(config, config_prefix, context);
auto settings = getSettings(config, config_prefix, context, endpoint, /* validate_settings */ true);
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */ true);
auto key_generator = createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key);
auto object_storage
= std::make_shared<S3ObjectStorage>(std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
return object_storage;
});
}
#endif
#if USE_HDFS
void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory)
{
factory.registerObjectStorageType(
GlutenObjectStorageConfig::HDFS_DISK_TYPE,
[](const std::string & /* name */,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool /* skip_access_check */) -> ObjectStoragePtr
{
auto uri = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
checkHDFSURL(uri);
if (uri.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);
std::unique_ptr<HDFSObjectStorageSettings> settings = std::make_unique<HDFSObjectStorageSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024), context->getSettingsRef()[Setting::hdfs_replication]);
return std::make_shared<GlutenHDFSObjectStorage>(uri, std::move(settings), config);
});
}
#endif
}