cpp-ch/local-engine/Common/GlutenConfig.h (142 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Interpreters/Context_fwd.h>
#include <base/unit.h>
#include <google/protobuf/map.h>
namespace Poco::Util
{
class AbstractConfiguration;
}
namespace DB
{
struct ReadSettings;
}
namespace local_engine
{
struct SparkConfigs
{
using ConfigMap = google::protobuf::Map<std::string, std::string>;
static void updateConfig(const DB::ContextMutablePtr &, std::string_view);
static void update(std::string_view plan, const std::function<void(const ConfigMap &)> & callback, bool processStart = false);
};
struct MemoryConfig
{
inline static const String EXTRA_MEMORY_HARD_LIMIT = "extra_memory_hard_limit";
inline static const String CH_TASK_MEMORY = "off_heap_per_task";
inline static const String SPILL_MEM_RATIO = "spill_mem_ratio";
size_t extra_memory_hard_limit = 0;
size_t off_heap_per_task = 0;
double spill_mem_ratio = 0.9;
static MemoryConfig loadFromContext(const DB::ContextPtr & context);
};
struct GraceMergingAggregateConfig
{
inline static const String MAX_GRACE_AGGREGATE_MERGING_BUCKETS = "max_grace_aggregate_merging_buckets";
inline static const String THROW_ON_OVERFLOW_GRACE_AGGREGATE_MERGING_BUCKETS = "throw_on_overflow_grace_aggregate_merging_buckets";
inline static const String AGGREGATED_KEYS_BEFORE_EXTEND_GRACE_AGGREGATE_MERGING_BUCKETS
= "aggregated_keys_before_extend_grace_aggregate_merging_buckets";
inline static const String MAX_PENDING_FLUSH_BLOCKS_PER_GRACE_AGGREGATE_MERGING_BUCKET
= "max_pending_flush_blocks_per_grace_aggregate_merging_bucket";
inline static const String MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING
= "max_allowed_memory_usage_ratio_for_aggregate_merging";
inline static const String ENABLE_SPILL_TEST = "enable_grace_aggregate_spill_test";
size_t max_grace_aggregate_merging_buckets = 32;
bool throw_on_overflow_grace_aggregate_merging_buckets = false;
size_t aggregated_keys_before_extend_grace_aggregate_merging_buckets = 8192;
size_t max_pending_flush_blocks_per_grace_aggregate_merging_bucket = 1_MiB;
double max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9;
bool enable_spill_test = false;
static GraceMergingAggregateConfig loadFromContext(const DB::ContextPtr & context);
};
struct StreamingAggregateConfig
{
inline static const String AGGREGATED_KEYS_BEFORE_STREAMING_AGGREGATING_EVICT = "aggregated_keys_before_streaming_aggregating_evict";
inline static const String MAX_MEMORY_USAGE_RATIO_FOR_STREAMING_AGGREGATING = "max_memory_usage_ratio_for_streaming_aggregating";
inline static const String HIGH_CARDINALITY_THRESHOLD_FOR_STREAMING_AGGREGATING
= "high_cardinality_threshold_for_streaming_aggregating";
inline static const String ENABLE_STREAMING_AGGREGATING = "enable_streaming_aggregating";
size_t aggregated_keys_before_streaming_aggregating_evict = 1024;
double max_memory_usage_ratio_for_streaming_aggregating = 0.9;
double high_cardinality_threshold_for_streaming_aggregating = 0.8;
bool enable_streaming_aggregating = true;
static StreamingAggregateConfig loadFromContext(const DB::ContextPtr & context);
};
struct JoinConfig
{
/// If the join condition is like `t1.k = t2.k and (t1.id1 = t2.id2 or t1.id2 = t2.id2)`, try to join with multi
/// join on clauses `(t1.k = t2.k and t1.id1 = t2.id2) or (t1.k = t2.k or t1.id2 = t2.id2)`
inline static const String PREFER_MULTI_JOIN_ON_CLAUSES = "prefer_multi_join_on_clauses";
/// Only hash join supports multi join on clauses, the right table cannot be too large. If the row number of right
/// table is larger then this limit, this transform will not work.
inline static const String MULTI_JOIN_ON_CLAUSES_BUILD_SIDE_ROWS_LIMIT = "multi_join_on_clauses_build_side_row_limit";
bool prefer_multi_join_on_clauses = true;
size_t multi_join_on_clauses_build_side_rows_limit = 10000000;
static JoinConfig loadFromContext(const DB::ContextPtr & context);
};
struct ExecutorConfig
{
inline static const String DUMP_PIPELINE = "dump_pipeline";
inline static const String USE_LOCAL_FORMAT = "use_local_format";
bool dump_pipeline = false;
bool use_local_format = false;
static ExecutorConfig loadFromContext(const DB::ContextPtr & context);
};
struct S3Config
{
inline static const String S3_LOCAL_CACHE_ENABLE = "s3.local_cache.enabled";
inline static const String S3_LOCAL_CACHE_MAX_SIZE = "s3.local_cache.max_size";
inline static const String S3_LOCAL_CACHE_CACHE_PATH = "s3.local_cache.cache_path";
inline static const String S3_GCS_ISSUE_COMPOSE_REQUEST = "s3.gcs_issue_compose_request";
bool s3_local_cache_enabled = false;
size_t s3_local_cache_max_size = 100_GiB;
String s3_local_cache_cache_path = "";
bool s3_gcs_issue_compose_request = false;
static S3Config loadFromContext(const DB::ContextPtr & context);
};
struct MergeTreeConfig
{
inline static const String TABLE_PART_METADATA_CACHE_MAX_COUNT = "table_part_metadata_cache_max_count";
inline static const String TABLE_METADATA_CACHE_MAX_COUNT = "table_metadata_cache_max_count";
size_t table_part_metadata_cache_max_count = 5000;
size_t table_metadata_cache_max_count = 500;
static MergeTreeConfig loadFromContext(const DB::ContextPtr & context);
};
struct GlutenJobSchedulerConfig
{
inline static const String JOB_SCHEDULER_MAX_THREADS = "job_scheduler_max_threads";
size_t job_scheduler_max_threads = 10;
static GlutenJobSchedulerConfig loadFromContext(const DB::ContextPtr & context);
};
struct MergeTreeCacheConfig
{
inline static const String ENABLE_DATA_PREFETCH = "enable_data_prefetch";
bool enable_data_prefetch = true;
static MergeTreeCacheConfig loadFromContext(const DB::ContextPtr & context);
};
struct WindowConfig
{
public:
inline static const String WINDOW_AGGREGATE_TOPK_SAMPLE_ROWS = "window.aggregate_topk_sample_rows";
inline static const String WINDOW_AGGREGATE_TOPK_HIGH_CARDINALITY_THRESHOLD = "window.aggregate_topk_high_cardinality_threshold";
size_t aggregate_topk_sample_rows = 5000;
double aggregate_topk_high_cardinality_threshold = 0.6;
static WindowConfig loadFromContext(const DB::ContextPtr & context);
};
namespace PathConfig
{
inline constexpr auto USE_CURRENT_DIRECTORY_AS_TMP = "use_current_directory_as_tmp";
inline constexpr auto DEFAULT_TEMP_FILE_PATH = "/tmp/libch";
};
/// Configurations for spark.sql.
/// TODO: spark_version
/// TODO: pass spark configs to clickhouse backend.
struct SparkSQLConfig
{
bool caseSensitive = false; // spark.sql.caseSensitive
size_t deltaDataSkippingNumIndexedCols = 32;
String deltaDataSkippingStatsColumns;
static SparkSQLConfig loadFromContext(const DB::ContextPtr & context);
};
struct GlutenCacheConfig
{
inline static const String PREFIX = "gluten_cache.local";
/// We can't use gluten_cache.local.enabled because FileCacheSettings doesn't contain this field.
inline static const String ENABLED = "enable.gluten_cache.local";
};
struct GlutenObjectStorageConfig
{
inline static const String S3_DISK_TYPE = "s3_gluten";
inline static const String HDFS_DISK_TYPE = "hdfs_gluten";
};
}