cpp-ch/local-engine/Common/CHUtil.cpp (883 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CHUtil.h"
#include <filesystem>
#include <memory>
#include <optional>
#include <unistd.h>
#include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnTuple.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Defines.h>
#include <Core/NamesAndTypes.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/NestedUtils.h>
#include <Disks/registerDisks.h>
#include <Disks/registerGlutenDisks.h>
#include <Formats/registerFormats.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/registerFunctions.h>
#include <IO/SharedThreadPools.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Parser/RelParsers/RelParser.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Processors/Chunk.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/printPipeline.h>
#include <Storages/Cache/CacheManager.h>
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <Storages/Output/WriteBufferBuilder.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <arrow/util/compression.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <sys/resource.h>
#include <Poco/Logger.h>
#include <Poco/Util/MapConfiguration.h>
#include <Common/BitHelpers.h>
#include <Common/BlockTypeUtils.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/GlutenSignalHandler.h>
#include <Common/LoggerExtend.h>
#include <Common/QueryContext.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ServerSetting
{
extern const ServerSettingsString primary_index_cache_policy;
extern const ServerSettingsUInt64 primary_index_cache_size;
extern const ServerSettingsDouble primary_index_cache_size_ratio;
extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_size;
extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_free_size;
extern const ServerSettingsUInt64 prefixes_deserialization_thread_pool_thread_pool_queue_size;
extern const ServerSettingsUInt64 max_thread_pool_size;
extern const ServerSettingsUInt64 thread_pool_queue_size;
extern const ServerSettingsUInt64 max_io_thread_pool_size;
extern const ServerSettingsUInt64 io_thread_pool_queue_size;
extern const ServerSettingsString vector_similarity_index_cache_policy;
extern const ServerSettingsUInt64 vector_similarity_index_cache_size;
extern const ServerSettingsUInt64 vector_similarity_index_cache_max_entries;
extern const ServerSettingsDouble vector_similarity_index_cache_size_ratio;
}
namespace Setting
{
extern const SettingsMaxThreads max_threads;
extern const SettingsUInt64 prefer_external_sort_block_bytes;
extern const SettingsUInt64 max_bytes_before_external_sort;
extern const SettingsDouble max_bytes_ratio_before_external_sort;
extern const SettingsBool query_plan_merge_filters;
extern const SettingsBool compile_expressions;
extern const SettingsShortCircuitFunctionEvaluation short_circuit_function_evaluation;
extern const SettingsUInt64 output_format_compression_level;
extern const SettingsBool query_plan_optimize_lazy_materialization;
}
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TYPE;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
}
extern void registerAggregateFunctionUniqHyperLogLogPlusPlus(AggregateFunctionFactory &);
}
namespace local_engine
{
namespace fs = std::filesystem;
DB::Block BlockUtil::buildRowCountHeader()
{
return DB::Block{createColumn<UInt8>({}, VIRTUAL_ROW_COUNT_COLUMN)};
}
DB::Chunk BlockUtil::buildRowCountChunk(UInt64 rows)
{
return DB::Chunk{{createColumnConst<UInt8>(rows, 0)}, rows};
}
DB::Block BlockUtil::buildRowCountBlock(UInt64 rows)
{
return DB::Block{createColumnConst<UInt8>(rows, 0, VIRTUAL_ROW_COUNT_COLUMN)};
}
/// The column names may be different in two blocks.
/// and the nullability also could be different, with TPCDS-Q1 as an example.
DB::ColumnWithTypeAndName
BlockUtil::convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column)
{
if (sample_column.type->equals(*column.type))
return {column.column, column.type, sample_column.name};
else if (sample_column.type->isNullable() && !column.type->isNullable() && DB::removeNullable(sample_column.type)->equals(*column.type))
{
auto nullable_column = column;
DB::JoinCommon::convertColumnToNullable(nullable_column);
return {nullable_column.column, sample_column.type, sample_column.name};
}
else
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
"Columns have different types. original:{} expected:{}",
column.dumpStructure(),
sample_column.dumpStructure());
}
/**
* There is a special case with which we need be careful. In spark, struct/map/list are always
* wrapped in Nullable, but this should not happen in clickhouse.
*/
DB::Block
BlockUtil::flattenBlock(const DB::Block & block, UInt64 flags, bool recursively, const std::unordered_set<size_t> & columns_to_skip_flatten)
{
DB::Block res;
for (size_t col_i = 0; col_i < block.columns(); ++col_i)
{
const auto & elem = block.getByPosition(col_i);
if (columns_to_skip_flatten.contains(col_i))
{
res.insert(elem);
continue;
}
DB::DataTypePtr nested_type = removeNullable(elem.type);
DB::ColumnPtr nested_col = elem.column;
DB::ColumnPtr null_map_col = nullptr;
// A special case, const(Nullable(nothing))
if (elem.type->isNullable() && typeid_cast<const DB::ColumnNullable *>(elem.column->getPtr().get()))
{
const auto * nullable_col = typeid_cast<const DB::ColumnNullable *>(elem.column->getPtr().get());
nested_col = nullable_col->getNestedColumnPtr();
null_map_col = nullable_col->getNullMapColumnPtr();
}
if (const DB::DataTypeArray * type_arr = typeid_cast<const DB::DataTypeArray *>(nested_type.get()))
{
const DB::DataTypeTuple * type_tuple = typeid_cast<const DB::DataTypeTuple *>(type_arr->getNestedType().get());
if (type_tuple && type_tuple->haveExplicitNames() && (flags & FLAT_NESTED_TABLE))
{
const DB::DataTypes & element_types = type_tuple->getElements();
const DB::Strings & names = type_tuple->getElementNames();
size_t tuple_size = element_types.size();
bool is_const = isColumnConst(*nested_col);
const DB::ColumnArray * column_array;
if (is_const)
column_array = typeid_cast<const DB::ColumnArray *>(&assert_cast<const DB::ColumnConst &>(*nested_col).getDataColumn());
else
column_array = typeid_cast<const DB::ColumnArray *>(nested_col.get());
const DB::ColumnPtr & column_offsets = column_array->getOffsetsPtr();
const DB::ColumnTuple & column_tuple = typeid_cast<const DB::ColumnTuple &>(column_array->getData());
const auto & element_columns = column_tuple.getColumns();
for (size_t i = 0; i < tuple_size; ++i)
{
String nested_name = DB::Nested::concatenateName(elem.name, names[i]);
DB::ColumnPtr column_array_of_element = DB::ColumnArray::create(element_columns[i], column_offsets);
auto named_column_array_of_element = DB::ColumnWithTypeAndName(
is_const ? DB::ColumnConst::create(std::move(column_array_of_element), block.rows()) : column_array_of_element,
std::make_shared<DB::DataTypeArray>(element_types[i]),
nested_name);
if (null_map_col)
{
// Should all field columns have the same null map ?
DB::DataTypePtr null_type = std::make_shared<DB::DataTypeNullable>(element_types[i]);
named_column_array_of_element.column
= DB::ColumnNullable::create(named_column_array_of_element.column, null_map_col);
named_column_array_of_element.type = null_type;
}
if (recursively)
{
auto flatten_one_col_block = flattenBlock({named_column_array_of_element}, flags, recursively);
for (const auto & named_col : flatten_one_col_block.getColumnsWithTypeAndName())
res.insert(named_col);
}
else
res.insert(named_column_array_of_element);
}
}
else
res.insert(elem);
}
else if (const DB::DataTypeTuple * type_tuple = typeid_cast<const DB::DataTypeTuple *>(nested_type.get()))
{
if ((flags & FLAT_STRUCT_FORCE) || (type_tuple->haveExplicitNames() && (flags & FLAT_STRUCT)))
{
const DB::DataTypes & element_types = type_tuple->getElements();
DB::Strings element_names = type_tuple->getElementNames();
if (element_names.empty())
{
// This is a struct without named fields, we should flatten it.
// But we can't get the field names, so we use the field index as the field name.
for (size_t i = 0; i < element_types.size(); ++i)
element_names.push_back(elem.name + "_filed_" + std::to_string(i));
}
const DB::ColumnTuple * column_tuple;
if (isColumnConst(*nested_col))
column_tuple = typeid_cast<const DB::ColumnTuple *>(&assert_cast<const DB::ColumnConst &>(*nested_col).getDataColumn());
else
column_tuple = typeid_cast<const DB::ColumnTuple *>(nested_col.get());
size_t tuple_size = column_tuple->tupleSize();
for (size_t i = 0; i < tuple_size; ++i)
{
const auto & element_column = column_tuple->getColumn(i);
String nested_name = DB::Nested::concatenateName(elem.name, element_names[i]);
auto new_element_col = DB::ColumnWithTypeAndName(element_column.getPtr(), element_types[i], nested_name);
if (null_map_col && !element_types[i]->isNullable())
{
// Should all field columns have the same null map ?
new_element_col.column = DB::ColumnNullable::create(new_element_col.column, null_map_col);
new_element_col.type = std::make_shared<DB::DataTypeNullable>(new_element_col.type);
}
if (recursively)
{
DB::Block one_col_block({new_element_col});
auto flatten_one_col_block = flattenBlock(one_col_block, flags, recursively);
for (const auto & named_col : flatten_one_col_block.getColumnsWithTypeAndName())
res.insert(named_col);
}
else
res.insert(std::move(new_element_col));
}
}
else
res.insert(elem);
}
else
res.insert(elem);
}
return res;
}
DB::Block BlockUtil::concatenateBlocksMemoryEfficiently(std::vector<DB::Block> && blocks)
{
if (blocks.empty())
return {};
size_t num_rows = 0;
for (const auto & block : blocks)
num_rows += block.rows();
DB::Block out = blocks[0].cloneEmpty();
DB::MutableColumns columns = out.mutateColumns();
for (size_t i = 0; i < columns.size(); ++i)
{
columns[i]->reserve(num_rows);
for (auto & block : blocks)
{
const auto & tmp_column = *block.getByPosition(0).column;
columns[i]->insertRangeFrom(tmp_column, 0, block.rows());
block.erase(0);
}
}
blocks.clear();
out.setColumns(std::move(columns));
return out;
}
size_t PODArrayUtil::adjustMemoryEfficientSize(size_t n)
{
/// According to definition of DEFUALT_BLOCK_SIZE
size_t padding_n = 2 * DB::PADDING_FOR_SIMD - 1;
size_t rounded_n = roundUpToPowerOfTwoOrZero(n);
size_t padded_n = n;
if (rounded_n > n + n / 2)
{
size_t smaller_rounded_n = rounded_n / 2;
padded_n = smaller_rounded_n < padding_n ? n : smaller_rounded_n - padding_n;
}
else
{
padded_n = rounded_n - padding_n;
}
return padded_n;
}
NestedColumnExtractHelper::NestedColumnExtractHelper(const DB::Block & block_, bool case_insentive_)
: block(block_), case_insentive(case_insentive_)
{
}
std::optional<DB::ColumnWithTypeAndName> NestedColumnExtractHelper::extractColumn(const String & column_name)
{
if (const auto * col = findColumn(block, column_name))
return {*col};
auto nested_names = DB::Nested::splitName(column_name);
if (case_insentive)
{
boost::to_lower(nested_names.first);
boost::to_lower(nested_names.second);
}
if (!findColumn(block, nested_names.first))
return {};
if (!nested_tables.contains(nested_names.first))
{
DB::ColumnsWithTypeAndName columns = {*findColumn(block, nested_names.first)};
nested_tables[nested_names.first] = std::make_shared<DB::Block>(BlockUtil::flattenBlock(columns));
}
return extractColumn(column_name, nested_names.first, nested_names.second);
}
std::optional<DB::ColumnWithTypeAndName> NestedColumnExtractHelper::extractColumn(
const String & original_column_name, const String & column_name_prefix, const String & column_name_suffix)
{
auto table_iter = nested_tables.find(column_name_prefix);
if (table_iter == nested_tables.end())
return {};
auto & nested_table = table_iter->second;
auto nested_names = DB::Nested::splitName(column_name_suffix);
auto new_column_name_prefix = DB::Nested::concatenateName(column_name_prefix, nested_names.first);
if (nested_names.second.empty())
{
if (const auto * column_ref = findColumn(*nested_table, new_column_name_prefix))
{
DB::ColumnWithTypeAndName column = *column_ref;
if (case_insentive)
column.name = original_column_name;
return {std::move(column)};
}
else
{
return {};
}
}
const auto * sub_col = findColumn(*nested_table, new_column_name_prefix);
if (!sub_col)
return {};
DB::ColumnsWithTypeAndName columns = {*sub_col};
DB::Block sub_block(columns);
nested_tables[new_column_name_prefix] = std::make_shared<DB::Block>(BlockUtil::flattenBlock(sub_block));
return extractColumn(original_column_name, new_column_name_prefix, nested_names.second);
}
const DB::ColumnWithTypeAndName * NestedColumnExtractHelper::findColumn(const DB::Block & in_block, const std::string & name) const
{
return in_block.findByName(name, case_insentive);
}
const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeType(
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const DB::DataTypePtr & cast_to_type,
const std::string & result_name,
DB::CastType cast_type)
{
DB::ColumnWithTypeAndName type_name_col;
type_name_col.name = cast_to_type->getName();
type_name_col.column = DB::DataTypeString().createColumnConst(0, type_name_col.name);
type_name_col.type = std::make_shared<DB::DataTypeString>();
const auto * right_arg = &actions_dag.addColumn(std::move(type_name_col));
const auto * left_arg = node;
DB::CastDiagnostic diagnostic = {node->result_name, node->result_name};
DB::ColumnWithTypeAndName left_column{nullptr, node->result_type, {}};
DB::ActionsDAG::NodeRawConstPtrs children = {left_arg, right_arg};
auto func_base_cast = createInternalCast(std::move(left_column), cast_to_type, cast_type, diagnostic);
return &actions_dag.addFunction(func_base_cast, std::move(children), result_name);
}
const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeTypeIfNeeded(
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const DB::DataTypePtr & dst_type,
const std::string & result_name,
DB::CastType cast_type)
{
if (node->result_type->equals(*dst_type))
return node;
return convertNodeType(actions_dag, node, dst_type, result_name, cast_type);
}
String QueryPipelineUtil::explainPipeline(DB::QueryPipeline & pipeline)
{
DB::WriteBufferFromOwnString buf;
const auto & processors = pipeline.getProcessors();
DB::printPipelineCompact(processors, buf, true);
return buf.str();
}
using namespace DB;
std::vector<String> BackendInitializerUtil::wrapDiskPathConfig(
const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config)
{
std::vector<String> changed_paths;
if (path_prefix.empty() && path_suffix.empty())
return changed_paths;
auto change_func = [&](String key) -> void
{
if (const String value = config.getString(key, ""); value != "")
{
const String change_value = path_prefix + value + path_suffix;
config.setString(key, change_value);
changed_paths.emplace_back(change_value);
LOG_INFO(getLogger("BackendInitializerUtil"), "Change config `{}` from '{}' to {}.", key, value, change_value);
}
};
Poco::Util::AbstractConfiguration::Keys disks;
std::unordered_set<String> disk_types = {GlutenObjectStorageConfig::S3_DISK_TYPE, GlutenObjectStorageConfig::HDFS_DISK_TYPE, "cache"};
config.keys("storage_configuration.disks", disks);
std::ranges::for_each(
disks,
[&](const auto & disk_name)
{
String disk_prefix = "storage_configuration.disks." + disk_name;
String disk_type = config.getString(disk_prefix + ".type", "");
if (!disk_types.contains(disk_type))
return;
if (disk_type == "cache")
change_func(disk_prefix + ".path");
else if (disk_type == GlutenObjectStorageConfig::S3_DISK_TYPE || disk_type == GlutenObjectStorageConfig::HDFS_DISK_TYPE)
change_func(disk_prefix + ".metadata_path");
});
change_func("path");
change_func(GlutenCacheConfig::PREFIX + ".path");
return changed_paths;
}
DB::Context::ConfigurationPtr BackendInitializerUtil::initConfig(const SparkConfigs::ConfigMap & spark_conf_map)
{
DB::Context::ConfigurationPtr config;
if (const String config_file = tryGetConfigFile(spark_conf_map); !config_file.empty())
{
if (fs::exists(config_file) && fs::is_regular_file(config_file))
{
ConfigProcessor config_processor(config_file, false, true);
DB::ConfigProcessor::setConfigPath(fs::path(config_file).parent_path());
const auto loaded_config = config_processor.loadConfig(false);
config = loaded_config.configuration;
}
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "{} is not a valid configure file.", config_file);
}
else
config = Poco::AutoPtr(new Poco::Util::MapConfiguration());
for (const auto & [key, value] : spark_conf_map)
{
if (key.starts_with(CH_RUNTIME_CONFIG_PREFIX) && key != CH_RUNTIME_CONFIG_FILE)
{
// Apply spark.gluten.sql.columnar.backend.ch.runtime_config.* to config
const auto name = key.substr(CH_RUNTIME_CONFIG_PREFIX.size());
if ((name == "storage_configuration.disks.s3.metadata_path" || name == "path") && !value.ends_with("/"))
config->setString(name, value + "/");
else
config->setString(name, value);
}
}
if (spark_conf_map.contains(GLUTEN_TASK_OFFHEAP))
config->setString(MemoryConfig::CH_TASK_MEMORY, spark_conf_map.at(GLUTEN_TASK_OFFHEAP));
const bool use_current_directory_as_tmp = config->getBool(PathConfig::USE_CURRENT_DIRECTORY_AS_TMP, false);
char buffer[PATH_MAX];
if (use_current_directory_as_tmp && getcwd(buffer, sizeof(buffer)) != nullptr)
wrapDiskPathConfig(String(buffer), "", *config);
const bool reuse_disk_cache = config->getBool("reuse_disk_cache", true);
if (!reuse_disk_cache)
{
String pid = std::to_string(static_cast<Int64>(getpid()));
auto path_need_clean = wrapDiskPathConfig("", "/" + pid, *config);
std::lock_guard lock(BackendFinalizerUtil::paths_mutex);
BackendFinalizerUtil::paths_need_to_clean.insert(
BackendFinalizerUtil::paths_need_to_clean.end(), path_need_clean.begin(), path_need_clean.end());
}
// FIXMEX: workaround for https://github.com/ClickHouse/ClickHouse/pull/75452#pullrequestreview-2625467710
// entry in DiskSelector::initialize
// Bug in FileCacheSettings::loadFromConfig
auto updateCacheDiskType = [](Poco::Util::AbstractConfiguration & config)
{
const std::string config_prefix = "storage_configuration.disks";
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
for (const auto & disk_name : keys)
{
const auto disk_config_prefix = config_prefix + "." + disk_name;
const auto disk_type = config.getString(disk_config_prefix + ".type", "local");
if (disk_type == "cache")
config.setString(disk_config_prefix, "workaround");
}
config.setString(GlutenCacheConfig::PREFIX, "workaround");
};
updateCacheDiskType(*config);
return config;
}
String BackendInitializerUtil::tryGetConfigFile(const SparkConfigs::ConfigMap & spark_conf_map)
{
if (spark_conf_map.contains(CH_RUNTIME_CONFIG_FILE))
return spark_conf_map.at(CH_RUNTIME_CONFIG_FILE);
/// Try to get config path from environment variable
if (const char * config_path = std::getenv("CLICKHOUSE_BACKEND_CONFIG"))
return config_path;
return String{};
}
void BackendInitializerUtil::initLoggers(DB::Context::ConfigurationPtr config)
{
auto level = config->getString("logger.level", "warning");
if (config->has("logger.log"))
local_engine::LoggerExtend::initFileLogger(*config, "ClickHouseBackend");
else
local_engine::LoggerExtend::initConsoleLogger(level);
logger = &Poco::Logger::get("ClickHouseBackend");
}
void BackendInitializerUtil::initEnvs(DB::Context::ConfigurationPtr config)
{
/// Set environment variable TZ if possible
if (config->has("timezone"))
{
const std::string config_timezone = config->getString("timezone");
const String mapped_timezone = DateTimeUtil::convertTimeZone(config_timezone);
if (0
!= setenv(
"TZ", mapped_timezone.data(), 1)) // NOLINT(concurrency-mt-unsafe) // ok if not called concurrently with other setenv/getenv
throw Poco::Exception("Cannot setenv TZ variable");
tzset();
DateLUT::setDefaultTimezone(mapped_timezone);
}
/// Set environment variable LIBHDFS3_CONF if possible
if (config->has(LIBHDFS3_CONF_KEY))
{
std::string libhdfs3_conf = config->getString(LIBHDFS3_CONF_KEY, "");
setenv("LIBHDFS3_CONF", libhdfs3_conf.c_str(), true); /// NOLINT
}
/// Enable logging in libhdfs3, logs will be written to stderr
setenv("HDFS_ENABLE_LOGGING", "true", true); /// NOLINT
/// Get environment varaible SPARK_USER if possible
if (const char * spark_user_c_str = std::getenv("SPARK_USER"))
spark_user = spark_user_c_str;
}
DB::Field BackendInitializerUtil::toField(const String & key, const String & value)
{
if (BOOL_VALUE_SETTINGS.contains(key))
return DB::Field(value == "true" || value == "1");
else if (LONG_VALUE_SETTINGS.contains(key))
return DB::Field(std::strtoll(value.c_str(), nullptr, 10));
else
return DB::Field(value);
}
void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_conf_map, DB::Settings & settings)
{
/// Initialize default setting.
settings.set("date_time_input_format", "best_effort");
settings.set(MERGETREE_MERGE_AFTER_INSERT, true);
settings.set(MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE, false);
settings.set(DECIMAL_OPERATIONS_ALLOW_PREC_LOSS, true);
settings.set("remote_filesystem_read_prefetch", false);
settings.set("max_parsing_threads", 1);
settings.set("max_download_threads", 1);
settings.set("input_format_parquet_enable_row_group_prefetch", false);
settings.set("output_format_parquet_use_custom_encoder", false);
//1.
// TODO: we need set Setting::max_threads to 1 by default, but now we can't get correct metrics for the some query if we set it to 1.
// settings[Setting::max_threads] = 1;
/// 2. After https://github.com/ClickHouse/ClickHouse/pull/71539
/// Set false to query_plan_merge_filters.
/// If true, we can't get correct metrics for the query
settings[Setting::query_plan_merge_filters] = false;
/// 3. After https://github.com/ClickHouse/ClickHouse/pull/70598.
/// Set false to compile_expressions to avoid dead loop.
/// TODO: FIXME set true again.
/// We now set BuildQueryPipelineSettings according to config.
settings[Setting::compile_expressions] = false;
settings[Setting::short_circuit_function_evaluation] = ShortCircuitFunctionEvaluation::DISABLE;
/// 4. After https://github.com/ClickHouse/ClickHouse/pull/73422
/// Since we already set max_bytes_before_external_sort, set max_bytes_ratio_before_external_sort to 0
settings[Setting::max_bytes_ratio_before_external_sort] = 0.;
/// 5. After https://github.com/ClickHouse/ClickHouse/pull/73651.
/// See following settings, we always use Snappy compression for Parquet, however after https://github.com/ClickHouse/ClickHouse/pull/73651,
/// output_format_compression_level is set to 3, which is wrong, since snappy does not support it.
settings[Setting::output_format_compression_level] = arrow::util::kUseDefaultCompressionLevel;
/// 6. After https://github.com/ClickHouse/ClickHouse/pull/55518
/// We currently do not support lazy materialization.
/// "test 'order by' two keys" will failed if we enable it.
settings[Setting::query_plan_optimize_lazy_materialization] = false;
for (const auto & [key, value] : spark_conf_map)
{
// Firstly apply spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to settings
if (key.starts_with(CH_RUNTIME_CONFIG_PREFIX + SETTINGS_PATH + "."))
{
settings.set(key.substr((CH_RUNTIME_CONFIG_PREFIX + SETTINGS_PATH + ".").size()), value);
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings from config key:{} value:{}", key, value);
}
else if (key.starts_with(CH_RUNTIME_SETTINGS_PREFIX))
{
auto k = key.substr(CH_RUNTIME_SETTINGS_PREFIX.size());
settings.set(k, toField(k, value));
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} value:{}", key, value);
}
else if (key.starts_with(SPARK_HADOOP_PREFIX + S3A_PREFIX))
{
// Apply general S3 configs, e.g. spark.hadoop.fs.s3a.access.key -> set in fs.s3a.access.key
// deal with per bucket S3 configs, e.g. fs.s3a.bucket.bucket_name.assumed.role.arn
// for gluten, we require first authenticate with AK/SK(or instance profile), then assume other roles with STS
// so only the following per-bucket configs are supported:
// 1. fs.s3a.bucket.bucket_name.assumed.role.arn
// 2. fs.s3a.bucket.bucket_name.assumed.role.session.name
// 3. fs.s3a.bucket.bucket_name.endpoint
// 4. fs.s3a.bucket.bucket_name.assumed.role.externalId (non hadoop official)
settings.set(key.substr(SPARK_HADOOP_PREFIX.length()), value);
}
else if (key.starts_with(SPARK_DELTA_PREFIX))
{
auto k = key.substr(SPARK_DELTA_PREFIX.size());
settings.set(k, toField(k, value));
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} value:{}", key, value);
}
else if (key == SPARK_SESSION_TIME_ZONE)
{
String time_zone_val = DateTimeUtil::convertTimeZone(value);
settings.set("session_timezone", time_zone_val);
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} value:{}", "session_timezone", time_zone_val);
}
else if (key == DECIMAL_OPERATIONS_ALLOW_PREC_LOSS)
{
settings.set(key, toField(key, value));
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} value:{}", key, value);
}
else if (key == TIMER_PARSER_POLICY)
{
settings.set(key, value);
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} value:{}", key, value);
}
}
/// Finally apply some fixed kvs to settings.
settings.set("join_use_nulls", true);
settings.set("input_format_orc_allow_missing_columns", true);
settings.set("input_format_orc_case_insensitive_column_matching", true);
settings.set("input_format_orc_import_nested", true);
settings.set("input_format_orc_skip_columns_with_unsupported_types_in_schema_inference", true);
settings.set("input_format_parquet_allow_missing_columns", true);
settings.set("input_format_parquet_case_insensitive_column_matching", true);
settings.set("input_format_parquet_import_nested", true);
settings.set("input_format_json_read_numbers_as_strings", true);
settings.set("input_format_json_read_bools_as_numbers", false);
settings.set("input_format_csv_trim_whitespaces", false);
settings.set("input_format_csv_allow_cr_end_of_line", true);
settings.set("output_format_orc_string_as_string", true);
settings.set("output_format_parquet_version", "1.0");
settings.set("output_format_parquet_compression_method", "snappy");
settings.set("output_format_parquet_string_as_string", true);
settings.set("output_format_parquet_fixed_string_as_fixed_byte_array", false);
settings.set("output_format_json_quote_64bit_integers", false);
settings.set("output_format_json_quote_denormals", true);
settings.set("output_format_json_skip_null_value_in_named_tuples", true);
settings.set("output_format_json_escape_forward_slashes", false);
settings.set("function_json_value_return_type_allow_complex", true);
settings.set("function_json_value_return_type_allow_nullable", true);
settings.set("precise_float_parsing", true);
settings.set("enable_named_columns_in_function_tuple", false);
settings.set("date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", true);
settings.set("input_format_orc_dictionary_as_low_cardinality", false); //after https://github.com/ClickHouse/ClickHouse/pull/69481
if (spark_conf_map.contains(GLUTEN_TASK_OFFHEAP))
{
auto task_memory = std::stoull(spark_conf_map.at(GLUTEN_TASK_OFFHEAP));
if (!spark_conf_map.contains(CH_RUNTIME_SETTINGS_PREFIX + "max_bytes_before_external_sort"))
{
settings[Setting::max_bytes_before_external_sort] = static_cast<size_t>(0.8 * task_memory);
}
if (!spark_conf_map.contains(CH_RUNTIME_SETTINGS_PREFIX + "prefer_external_sort_block_bytes"))
{
auto mem_gb = task_memory / static_cast<double>(1_GiB);
// 2.8x+5, Heuristics calculate the block size of external sort, [8,16]
settings[Setting::prefer_external_sort_block_bytes]
= std::max(std::min(static_cast<size_t>(2.8 * mem_gb + 5), 16ul), 8ul) * 1024 * 1024;
}
}
}
void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
{
/// Make sure global_context and shared_context are constructed only once.
if (auto global_context = QueryContext::globalMutableContext(); !global_context)
{
ServerSettings server_settings;
server_settings.loadSettingsFromConfig(*config);
auto log = getLogger("CHUtil");
global_context = QueryContext::createGlobal();
global_context->makeGlobalContext();
global_context->setConfig(config);
auto tmp_path = config->getString("tmp_path", PathConfig::DEFAULT_TEMP_FILE_PATH);
if (config->getBool(PathConfig::USE_CURRENT_DIRECTORY_AS_TMP, false))
{
char buffer[PATH_MAX];
if (getcwd(buffer, sizeof(buffer)) != nullptr)
tmp_path = std::string(buffer) + tmp_path;
};
global_context->setTemporaryStoragePath(tmp_path, 0);
global_context->setPath(config->getString("path", "/"));
String uncompressed_cache_policy = config->getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY);
size_t uncompressed_cache_size = config->getUInt64("uncompressed_cache_size", DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE);
double uncompressed_cache_size_ratio = config->getDouble("uncompressed_cache_size_ratio", DEFAULT_UNCOMPRESSED_CACHE_SIZE_RATIO);
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size, uncompressed_cache_size_ratio);
String mark_cache_policy = config->getString("mark_cache_policy", DEFAULT_MARK_CACHE_POLICY);
size_t mark_cache_size = config->getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE);
double mark_cache_size_ratio = config->getDouble("mark_cache_size_ratio", DEFAULT_MARK_CACHE_SIZE_RATIO);
if (!mark_cache_size)
LOG_ERROR(log, "Mark cache is disabled, it will lead to severe performance degradation.");
LOG_INFO(log, "mark cache size to {}.", formatReadableSizeWithBinarySuffix(mark_cache_size));
global_context->setMarkCache(mark_cache_policy, mark_cache_size, mark_cache_size_ratio);
String primary_index_cache_policy = server_settings[ServerSetting::primary_index_cache_policy];
size_t primary_index_cache_size = server_settings[ServerSetting::primary_index_cache_size];
double primary_index_cache_size_ratio = server_settings[ServerSetting::primary_index_cache_size_ratio];
LOG_INFO(log, "Primary index cache size to {}.", formatReadableSizeWithBinarySuffix(primary_index_cache_size));
global_context->setPrimaryIndexCache(primary_index_cache_policy, primary_index_cache_size, primary_index_cache_size_ratio);
String index_uncompressed_cache_policy
= config->getString("index_uncompressed_cache_policy", DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY);
size_t index_uncompressed_cache_size
= config->getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
double index_uncompressed_cache_size_ratio
= config->getDouble("index_uncompressed_cache_size_ratio", DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
global_context->setIndexUncompressedCache(
index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);
String index_mark_cache_policy = config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
size_t index_mark_cache_size = config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
double index_mark_cache_size_ratio = config->getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);
String vector_similarity_index_cache_policy = server_settings[ServerSetting::vector_similarity_index_cache_policy];
size_t vector_similarity_index_cache_size = server_settings[ServerSetting::vector_similarity_index_cache_size];
size_t vector_similarity_index_cache_max_count = server_settings[ServerSetting::vector_similarity_index_cache_max_entries];
double vector_similarity_index_cache_size_ratio = server_settings[ServerSetting::vector_similarity_index_cache_size_ratio];
LOG_INFO(log, "Lowered vector similarity index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(vector_similarity_index_cache_size));
global_context->setVectorSimilarityIndexCache(vector_similarity_index_cache_policy, vector_similarity_index_cache_size, vector_similarity_index_cache_max_count, vector_similarity_index_cache_size_ratio);
getMergeTreePrefixesDeserializationThreadPool().initialize(
server_settings[ServerSetting::max_prefixes_deserialization_thread_pool_size],
server_settings[ServerSetting::max_prefixes_deserialization_thread_pool_free_size],
server_settings[ServerSetting::prefixes_deserialization_thread_pool_thread_pool_queue_size]);
size_t mmap_cache_size = config->getUInt64("mmap_cache_size", DEFAULT_MMAP_CACHE_MAX_SIZE);
global_context->setMMappedFileCache(mmap_cache_size);
/// Initialize a dummy query result cache.
global_context->setQueryResultCache(0, 0, 0, 0);
/// Initialize a dummy query condition cache.
global_context->setQueryConditionCache(DEFAULT_QUERY_CONDITION_CACHE_POLICY, 0, 0);
// We must set the application type to CLIENT to avoid ServerUUID::get() throw exception
global_context->setApplicationType(Context::ApplicationType::CLIENT);
}
else
{
// just for ut
global_context->updateStorageConfiguration(*config);
}
}
void BackendInitializerUtil::applyGlobalConfigAndSettings(const DB::Context::ConfigurationPtr & config, const DB::Settings & settings)
{
const auto global_context = QueryContext::globalMutableContext();
global_context->setConfig(config);
global_context->setSettings(settings);
}
extern void registerAggregateFunctionCombinatorPartialMerge(AggregateFunctionCombinatorFactory &);
extern void registerAggregateFunctionsBloomFilter(AggregateFunctionFactory &);
extern void registerAggregateFunctionSparkAvg(AggregateFunctionFactory &);
extern void registerAggregateFunctionRowNumGroup(AggregateFunctionFactory &);
extern void registerAggregateFunctionDVRoaringBitmap(AggregateFunctionFactory &);
extern void registerFunctions(FunctionFactory &);
void registerAllFunctions()
{
DB::registerFunctions();
DB::registerAggregateFunctions();
auto & agg_factory = AggregateFunctionFactory::instance();
registerAggregateFunctionsBloomFilter(agg_factory);
registerAggregateFunctionSparkAvg(agg_factory);
registerAggregateFunctionRowNumGroup(agg_factory);
DB::registerAggregateFunctionUniqHyperLogLogPlusPlus(agg_factory);
registerAggregateFunctionDVRoaringBitmap(agg_factory);
/// register aggregate function combinators from local_engine
auto & combinator_factory = AggregateFunctionCombinatorFactory::instance();
registerAggregateFunctionCombinatorPartialMerge(combinator_factory);
}
void registerGlutenDisks()
{
registerDisks(true);
registerGlutenDisks(true);
}
void BackendInitializerUtil::registerAllFactories()
{
registerFormats();
registerGlutenDisks();
registerReadBufferBuilders();
registerWriteBufferBuilders();
LOG_INFO(logger, "Register read buffer builders.");
registerRelParsers();
LOG_INFO(logger, "Register relation parsers.");
registerAllFunctions();
LOG_INFO(logger, "Register all functions.");
}
void BackendInitializerUtil::initCompiledExpressionCache(DB::Context::ConfigurationPtr config)
{
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
size_t compiled_expression_cache_size = config->getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
size_t compiled_expression_cache_elements_size
= config->getUInt64("compiled_expression_cache_elements_size", compiled_expression_cache_elements_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
#endif
}
void BackendInitializerUtil::initBackend(const SparkConfigs::ConfigMap & spark_conf_map)
{
DB::Context::ConfigurationPtr config = initConfig(spark_conf_map);
initLoggers(config);
initEnvs(config);
LOG_INFO(logger, "Init environment variables.");
DB::Settings settings;
initSettings(spark_conf_map, settings);
LOG_INFO(logger, "Init settings.");
initContexts(config);
LOG_INFO(logger, "Init shared context and global context.");
applyGlobalConfigAndSettings(config, settings);
LOG_INFO(logger, "Apply configuration and setting for global context.");
// clean static per_bucket_clients and shared_client before running local engine,
// in case of running the multiple gluten ut in one process
ReadBufferBuilderFactory::instance().clean();
// Init the table metadata cache map
StorageMergeTreeFactory::init_cache_map();
JobScheduler::initialize(QueryContext::globalContext());
CacheManager::initialize(QueryContext::globalMutableContext());
std::call_once(
init_flag,
[&]
{
SignalHandler::instance().init();
registerAllFactories();
LOG_INFO(logger, "Register all factories.");
initCompiledExpressionCache(config);
LOG_INFO(logger, "Init compiled expressions cache factory.");
ServerSettings server_settings;
server_settings.loadSettingsFromConfig(*config);
GlobalThreadPool::initialize(
server_settings[ServerSetting::max_thread_pool_size], 0, server_settings[ServerSetting::thread_pool_queue_size]);
getIOThreadPool().initialize(
server_settings[ServerSetting::max_io_thread_pool_size], 0, server_settings[ServerSetting::io_thread_pool_queue_size]);
const size_t active_parts_loading_threads = config->getUInt("max_active_parts_loading_thread_pool_size", 64);
DB::getActivePartsLoadingThreadPool().initialize(
active_parts_loading_threads,
0, // We don't need any threads one all the parts will be loaded
active_parts_loading_threads);
const size_t cleanup_threads = config->getUInt("max_parts_cleaning_thread_pool_size", 128);
getPartsCleaningThreadPool().initialize(
cleanup_threads,
0, // We don't need any threads one all the parts will be deleted
cleanup_threads);
// Avoid using LD_PRELOAD in child process
unsetenv("LD_PRELOAD");
});
}
void BackendFinalizerUtil::finalizeGlobally()
{
// Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear();
QueryContext::resetGlobal();
std::lock_guard lock(paths_mutex);
std::ranges::for_each(
paths_need_to_clean,
[](const auto & path)
{
if (fs::exists(path))
fs::remove_all(path);
});
paths_need_to_clean.clear();
}
void BackendFinalizerUtil::finalizeSessionally()
{
}
std::vector<String> BackendFinalizerUtil::paths_need_to_clean;
std::mutex BackendFinalizerUtil::paths_mutex;
Int64 DateTimeUtil::currentTimeMillis()
{
return timeInMilliseconds(std::chrono::system_clock::now());
}
String DateTimeUtil::convertTimeZone(const String & time_zone)
{
String res = time_zone;
/// Convert timezone ID like '+08:00' to GMT+8:00
if (time_zone.starts_with("+") || time_zone.starts_with("-"))
res = "GMT" + time_zone;
res = DateLUT::mappingForJavaTimezone(res);
return res;
}
UInt64 MemoryUtil::getMemoryRSS()
{
long rss = 0L;
FILE * fp = NULL;
char buf[4096];
sprintf(buf, "/proc/%d/statm", getpid());
if ((fp = fopen(buf, "r")) == NULL)
return 0;
fscanf(fp, "%*s%ld", &rss);
fclose(fp);
return rss * sysconf(_SC_PAGESIZE);
}
void JoinUtil::reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols)
{
ActionsDAG project{plan.getCurrentHeader().getNamesAndTypesList()};
NamesWithAliases project_cols;
for (const auto & col : cols)
{
project_cols.emplace_back(NameWithAlias(col, col));
}
project.project(project_cols);
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(plan.getCurrentHeader(), std::move(project));
project_step->setStepDescription("Reorder Join Output");
plan.addStep(std::move(project_step));
}
std::pair<DB::JoinKind, DB::JoinStrictness>
JoinUtil::getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join)
{
switch (join_type)
{
case substrait::JoinRel_JoinType_JOIN_TYPE_INNER:
return {DB::JoinKind::Inner, DB::JoinStrictness::All};
case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT_SEMI: {
if (is_existence_join)
return {DB::JoinKind::Left, DB::JoinStrictness::Any};
return {DB::JoinKind::Left, DB::JoinStrictness::Semi};
}
case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT_SEMI:
return {DB::JoinKind::Right, DB::JoinStrictness::Semi};
case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT_ANTI:
return {DB::JoinKind::Left, DB::JoinStrictness::Anti};
case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT_ANTI:
return {DB::JoinKind::Right, DB::JoinStrictness::Anti};
case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT:
return {DB::JoinKind::Left, DB::JoinStrictness::All};
case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT:
return {DB::JoinKind::Right, DB::JoinStrictness::All};
case substrait::JoinRel_JoinType_JOIN_TYPE_OUTER:
return {DB::JoinKind::Full, DB::JoinStrictness::All};
default:
throw Exception(ErrorCodes::UNKNOWN_TYPE, "unsupported join type {}.", magic_enum::enum_name(join_type));
}
}
std::pair<DB::JoinKind, DB::JoinStrictness> JoinUtil::getCrossJoinKindAndStrictness(substrait::CrossRel_JoinType join_type)
{
switch (join_type)
{
case substrait::CrossRel_JoinType_JOIN_TYPE_INNER:
case substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
case substrait::CrossRel_JoinType_JOIN_TYPE_OUTER:
return {DB::JoinKind::Cross, DB::JoinStrictness::All};
default:
throw Exception(ErrorCodes::UNKNOWN_TYPE, "unsupported join type {}.", magic_enum::enum_name(join_type));
}
}
}