cpp-ch/local-engine/Common/CHUtil.h (172 lines of code) (raw):

/* /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <unordered_set> #include <Core/Block.h> #include <Core/Block_fwd.h> #include <Core/Joins.h> #include <Functions/CastOverloadResolver.h> #include <Interpreters/ActionsDAG.h> #include <Interpreters/Context.h> #include <Processors/Chunk.h> #include <Processors/QueryPlan/IQueryPlanStep.h> #include <base/types.h> #include <substrait/algebra.pb.h> #include <Common/CurrentThread.h> #include <Common/GlutenConfig.h> namespace DB { class QueryPipeline; class QueryPlan; } namespace local_engine { static const String MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE = "mergetree.insert_without_local_storage"; static const String MERGETREE_MERGE_AFTER_INSERT = "mergetree.merge_after_insert"; static const std::string DECIMAL_OPERATIONS_ALLOW_PREC_LOSS = "spark.sql.decimalOperations.allowPrecisionLoss"; static const std::string TIMER_PARSER_POLICY = "spark.sql.legacy.timeParserPolicy"; static const std::unordered_set<String> BOOL_VALUE_SETTINGS{ MERGETREE_MERGE_AFTER_INSERT, MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE, DECIMAL_OPERATIONS_ALLOW_PREC_LOSS}; static const std::unordered_set<String> LONG_VALUE_SETTINGS{ "optimize.maxfilesize", "optimize.minFileSize", "mergetree.max_num_part_per_merge_task"}; class BlockUtil { public: static constexpr auto VIRTUAL_ROW_COUNT_COLUMN = "__VIRTUAL_ROW_COUNT_COLUMN__"; static constexpr auto RIHGT_COLUMN_PREFIX = "broadcast_right_"; // Build a header block with a virtual column which will be // use to indicate the number of rows in a block. // Commonly seen in the following quries: // - select count(1) from t // - select 1 from t static DB::Block buildRowCountHeader(); static DB::Chunk buildRowCountChunk(UInt64 rows); static DB::Block buildRowCountBlock(UInt64 rows); static constexpr UInt64 FLAT_STRUCT = 1; static constexpr UInt64 FLAT_NESTED_TABLE = 2; /// If it's a struct without named fields, also force to flatten it. static constexpr UInt64 FLAT_STRUCT_FORCE = 4; // flatten the struct and array(struct) columns. // It's different from Nested::flattend() static DB::Block flattenBlock( const DB::Block & block, UInt64 flags = FLAT_STRUCT | FLAT_NESTED_TABLE, bool recursively = false, const std::unordered_set<size_t> & columns_to_skip_flatten = {}); static DB::Block concatenateBlocksMemoryEfficiently(std::vector<DB::Block> && blocks); /// The column names may be different in two blocks. /// and the nullability also could be different, with TPCDS-Q1 as an example. static DB::ColumnWithTypeAndName convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column); }; class PODArrayUtil { public: /// To allocate n bytes, PODArray will allocate n + pad_left + pad_right bytes in fact. So when /// we want to allocate 2^k bytes, 2^(k+1) bytes are allocated. This makes the memory usage far /// more than we expected, and easy to cause OOM. For example, we want to limit the max block size to be /// 64k rows, CH will make the memory usage equal to 128k rows, and half of the reserved memory is not used. /// So we adjust the size by considering the padding bytes, the return value may be samller then n. static size_t adjustMemoryEfficientSize(size_t n); }; /// Use this class to extract element columns from columns of nested type in a block, e.g. named Tuple. /// It can extract a column from a multiple nested type column, e.g. named Tuple in named Tuple /// Keeps some intermediate data to avoid rebuild them multi-times. class NestedColumnExtractHelper { public: explicit NestedColumnExtractHelper(const DB::Block & block_, bool case_insentive_); std::optional<DB::ColumnWithTypeAndName> extractColumn(const String & column_name); private: std::optional<DB::ColumnWithTypeAndName> extractColumn(const String & original_column_name, const String & column_name_prefix, const String & column_name_suffix); const DB::Block & block; bool case_insentive; std::map<String, DB::BlockPtr> nested_tables; const DB::ColumnWithTypeAndName * findColumn(const DB::Block & block, const std::string & name) const; }; class ActionsDAGUtil { public: static const DB::ActionsDAG::Node * convertNodeType( DB::ActionsDAG & actions_dag, const DB::ActionsDAG::Node * node_to_cast, const DB::DataTypePtr & cast_to_type, const std::string & result_name = "", DB::CastType cast_type = DB::CastType::nonAccurate); static const DB::ActionsDAG::Node * convertNodeTypeIfNeeded( DB::ActionsDAG & actions_dag, const DB::ActionsDAG::Node * node, const DB::DataTypePtr & dst_type, const std::string & result_name = "", DB::CastType cast_type = DB::CastType::nonAccurate); }; class QueryPipelineUtil { public: static String explainPipeline(DB::QueryPipeline & pipeline); }; void registerAllFunctions(); void registerGlutenDisks(); class BackendFinalizerUtil; class JNIUtils; class BackendInitializerUtil { public: static DB::Field toField(const String & key, const String & value); /// Initialize two kinds of resources /// 1. global level resources like global_context/shared_context, notice that they can only be initialized once in process lifetime /// 2. session level resources like settings/configs, they can be initialized multiple times following the lifetime of executor/driver static void initBackend(const SparkConfigs::ConfigMap & spark_conf_map); static void initSettings(const SparkConfigs::ConfigMap & spark_conf_map, DB::Settings & settings); inline static const String CH_BACKEND_PREFIX = "spark.gluten.sql.columnar.backend.ch"; inline static const String CH_RUNTIME_CONFIG = "runtime_config"; inline static const String CH_RUNTIME_CONFIG_PREFIX = CH_BACKEND_PREFIX + "." + CH_RUNTIME_CONFIG + "."; inline static const String CH_RUNTIME_CONFIG_FILE = CH_RUNTIME_CONFIG_PREFIX + "config_file"; inline static const String CH_RUNTIME_SETTINGS = "runtime_settings"; inline static const String CH_RUNTIME_SETTINGS_PREFIX = CH_BACKEND_PREFIX + "." + CH_RUNTIME_SETTINGS + "."; inline static const String SETTINGS_PATH = "local_engine.settings"; inline static const String LIBHDFS3_CONF_KEY = "hdfs.libhdfs3_conf"; inline static const std::string HADOOP_S3_ACCESS_KEY = "fs.s3a.access.key"; inline static const std::string HADOOP_S3_SECRET_KEY = "fs.s3a.secret.key"; inline static const std::string HADOOP_S3_ENDPOINT = "fs.s3a.endpoint"; inline static const std::string HADOOP_S3_ASSUMED_ROLE = "fs.s3a.assumed.role.arn"; inline static const std::string HADOOP_S3_PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; inline static const std::string HADOOP_S3_ASSUMED_SESSION_NAME = "fs.s3a.assumed.role.session.name"; // not hadoop official inline static const std::string HADOOP_S3_ASSUMED_EXTERNAL_ID = "fs.s3a.assumed.role.externalId"; // hadoop official, this is used to ignore the cached client inline static const std::string HADOOP_S3_CLIENT_CACHE_IGNORE = "fs.s3a.client.cached.ignore"; inline static const std::string SPARK_HADOOP_PREFIX = "spark.hadoop."; inline static const std::string S3A_PREFIX = "fs.s3a."; inline static const std::string SPARK_DELTA_PREFIX = "spark.databricks.delta."; inline static const std::string SPARK_SESSION_TIME_ZONE = "spark.sql.session.timeZone"; inline static const String GLUTEN_TASK_OFFHEAP = "spark.gluten.memory.task.offHeap.size.in.bytes"; /// On yarn mode, native writing on hdfs cluster takes yarn container user as the user passed to libhdfs3, which /// will cause permission issue because yarn container user is not the owner of the hdfs dir to be written. /// So we need to get the spark user from env and pass it to libhdfs3. inline static std::optional<String> spark_user; private: friend class BackendFinalizerUtil; friend class JNIUtils; static DB::Context::ConfigurationPtr initConfig(const SparkConfigs::ConfigMap & spark_conf_map); static String tryGetConfigFile(const SparkConfigs::ConfigMap & spark_conf_map); static void initLoggers(DB::Context::ConfigurationPtr config); static void initEnvs(DB::Context::ConfigurationPtr config); static void initContexts(DB::Context::ConfigurationPtr config); static void initCompiledExpressionCache(DB::Context::ConfigurationPtr config); static void registerAllFactories(); static void applyGlobalConfigAndSettings(const DB::Context::ConfigurationPtr & config, const DB::Settings & settings); static std::vector<String> wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config); inline static std::once_flag init_flag; inline static Poco::Logger * logger; }; class BackendFinalizerUtil { public: /// Release global level resources like global_context/shared_context. Invoked only once in the lifetime of process when JVM is shuting down. static void finalizeGlobally(); /// Release session level resources like StorageJoinBuilder. Invoked every time executor/driver shutdown. static void finalizeSessionally(); static std::vector<String> paths_need_to_clean; static std::mutex paths_mutex; }; // Ignore memory track, memory should free before IgnoreMemoryTracker deconstruction class IgnoreMemoryTracker { public: explicit IgnoreMemoryTracker(size_t limit_) : limit(limit_) { DB::CurrentThread::get().untracked_memory_limit += limit; } ~IgnoreMemoryTracker() { DB::CurrentThread::get().untracked_memory_limit -= limit; } private: size_t limit; }; class DateTimeUtil { public: static Int64 currentTimeMillis(); static String convertTimeZone(const String & time_zone); }; class MemoryUtil { public: static UInt64 getMemoryRSS(); }; class JoinUtil { public: static void reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols); static std::pair<DB::JoinKind, DB::JoinStrictness> getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join); static std::pair<DB::JoinKind, DB::JoinStrictness> getCrossJoinKindAndStrictness(substrait::CrossRel_JoinType join_type); }; }